This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a0ed65  [CARBONDATA-3843] Support merging index for streaming table
1a0ed65 is described below

commit 1a0ed65270acc6694ed52b13aaddf55bfcfe0422
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Tue Jun 2 20:00:38 2020 +0530

    [CARBONDATA-3843] Support merging index for streaming table
    
    Why is this PR needed?
    Merge index is not created for normal segment (created by load, insert, 
compaction or handoff) on streaming table.
    
    What changes were proposed in this PR?
    For a streaming table other than streaming segment (Row_V1), allow merge 
index creation for all kinds of segments.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes  #3785
---
 docs/ddl-of-carbondata.md                          |   3 +-
 .../spark/sql/events/MergeIndexEventListener.scala | 134 +++++++++++----------
 .../CarbonAlterTableCompactionCommand.scala        |   5 -
 .../CarbonAlterTableAddHivePartitionCommand.scala  |  11 +-
 .../CarbonIndexFileMergeTestCase.scala             |  64 ++++++++--
 5 files changed, 134 insertions(+), 83 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3165f4e..e7cfb0c 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -751,8 +751,9 @@ Users can specify which columns to include and exclude for 
local dictionary gene
      ```
 
      **NOTE:**
+     * Merge index is supported on streaming table from carbondata 2.0.1 
version.
+     But streaming segments (ROW_V1) cannot create merge index.
 
-     * Merge index is not supported on streaming table.
 
    - #### SET and UNSET
    
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 2995edc..4e06ff0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -30,9 +30,8 @@ import org.apache.spark.util.MergeIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{FileFormat, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.{DataLoadMetrics, 
ObjectSerializationUtil}
 import org.apache.carbondata.events._
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
@@ -62,7 +61,8 @@ class MergeIndexEventListener extends OperationEventListener 
with Logging {
             .asInstanceOf[util.List[String]]
         }
         val tempPath = operationContext.getProperty("tempPath")
-        if(!carbonTable.isStreamingSink) {
+        val loadMetaDetails = loadModel.getCurrentLoadMetadataDetail
+        if (loadMetaDetails != null && 
!loadMetaDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
           if (null != compactedSegments && !compactedSegments.isEmpty) {
             MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession,
               carbonTable,
@@ -104,73 +104,77 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if 
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName}")
+            val loadFolderDetailsArray = SegmentStatusManager
+              .readLoadMetadata(carbonMainTable.getMetadataPath)
+            val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String,
+              String]()
+            var streamingSegment: Set[String] = Set[String]()
+            loadFolderDetailsArray.foreach(loadMetadataDetails => {
+              if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) 
{
+                streamingSegment += loadMetadataDetails.getLoadName
+              }
+              segmentFileNameMap
+                .put(loadMetadataDetails.getLoadName,
+                  String.valueOf(loadMetadataDetails.getLoadStartTime))
+            })
+            val segmentsToMerge =
+              if 
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+                val validSegments =
+                  
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+                val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
+                validSegments.foreach { segment =>
+                  // do not add ROW_V1 format
+                  if 
(!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
                     validSegmentIds += segment.getSegmentNo
                   }
-                  validSegmentIds
-                } else {
-                  
alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
                 }
-
-              val loadFolderDetailsArray = SegmentStatusManager
-                .readLoadMetadata(carbonMainTable.getMetadataPath)
-              val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String,
-                String]()
-              loadFolderDetailsArray.foreach(loadMetadataDetails => {
-                segmentFileNameMap
-                  .put(loadMetadataDetails.getLoadName,
-                    String.valueOf(loadMetadataDetails.getLoadStartTime))
-              })
-              // in case of merge index file creation using Alter DDL command
-              // readFileFooterFromCarbonDataFile flag should be true. This 
flag is check for legacy
-              // store (store <= 1.1 version) and create merge Index file as 
per new store so that
-              // old store is also upgraded to new store
-              val startTime = System.currentTimeMillis()
-              CarbonMergeFilesRDD.mergeIndexFiles(
-                sparkSession = sparkSession,
-                segmentIds = segmentsToMerge,
-                segmentFileNameToSegmentIdMap = segmentFileNameMap,
-                tablePath = carbonMainTable.getTablePath,
-                carbonTable = carbonMainTable,
-                mergeIndexProperty = true,
-                readFileFooterFromCarbonDataFile = true)
-              LOGGER.info("Total time taken for merge index "
-                          + (System.currentTimeMillis() - startTime) + "ms")
-              // clear Block index Cache
-              MergeIndexUtil.clearBlockIndexCache(carbonMainTable, 
segmentsToMerge)
-              val requestMessage = "Compaction request completed for table " +
-                s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }"
-              LOGGER.info(requestMessage)
-            } else {
-              val lockMessage = "Not able to acquire the compaction lock for 
table " +
-                                s"${ carbonMainTable.getDatabaseName }." +
-                                s"${ carbonMainTable.getTableName}"
-              LOGGER.error(lockMessage)
-              CarbonException.analysisException(
-                "Table is already locked for compaction. Please try after some 
time.")
-            }
-          } finally {
-            lock.unlock()
+                validSegmentIds
+              } else {
+                alterTableMergeIndexEvent.alterTableModel
+                  .customSegmentIds
+                  .get
+                  .filterNot(streamingSegment.contains(_))
+              }
+            // in case of merge index file creation using Alter DDL command
+            // readFileFooterFromCarbonDataFile flag should be true. This flag 
is check for legacy
+            // store (store <= 1.1 version) and create merge Index file as per 
new store so that
+            // old store is also upgraded to new store
+            val startTime = System.currentTimeMillis()
+            CarbonMergeFilesRDD.mergeIndexFiles(
+              sparkSession = sparkSession,
+              segmentIds = segmentsToMerge,
+              segmentFileNameToSegmentIdMap = segmentFileNameMap,
+              tablePath = carbonMainTable.getTablePath,
+              carbonTable = carbonMainTable,
+              mergeIndexProperty = true,
+              readFileFooterFromCarbonDataFile = true)
+            LOGGER.info("Total time taken for merge index "
+                        + (System.currentTimeMillis() - startTime) + "ms")
+            // clear Block index Cache
+            MergeIndexUtil.clearBlockIndexCache(carbonMainTable, 
segmentsToMerge)
+            val requestMessage = "Compaction request completed for table " +
+              s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }"
+            LOGGER.info(requestMessage)
+          } else {
+            val lockMessage = "Not able to acquire the compaction lock for 
table " +
+                              s"${ carbonMainTable.getDatabaseName }." +
+                              s"${ carbonMainTable.getTableName}"
+            LOGGER.error(lockMessage)
+            CarbonException.analysisException(
+              "Table is already locked for compaction. Please try after some 
time.")
           }
+        } finally {
+          lock.unlock()
         }
     }
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 2224943..dc50cf5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -131,11 +131,6 @@ case class CarbonAlterTableCompactionCommand(
       }
       Seq.empty
     } else if (compactionType == CompactionType.SEGMENT_INDEX) {
-      if (table.isStreamingSink) {
-        throw new MalformedCarbonCommandException(
-          "Unsupported alter operation on carbon table: Merge index is not 
supported on streaming" +
-          " table")
-      }
       val version = CarbonUtil.getFormatVersion(table)
       val isOlderVersion = version == ColumnarFormatVersion.V1 ||
                            version == ColumnarFormatVersion.V2
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index a080db6..09614a8 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -33,7 +33,7 @@ import 
org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableMergeIndexEvent, 
OperationContext, OperationListenerBus, 
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
@@ -160,6 +160,13 @@ case class CarbonAlterTableAddHivePartitionCommand(
         // carbon index files, and it is not good for query performance since 
all index files
         // need to be read to spark driver.
         // So, here trigger to merge the index files by sending an event
+        val customSegmentIds = if (loadModel.getCurrentLoadMetadataDetail
+          .getFileFormat
+          .equals(FileFormat.ROW_V1)) {
+          Some(Seq("").toList)
+        } else {
+          Some(Seq(loadModel.getSegmentId).toList)
+        }
         val alterTableModel = AlterTableModel(
           dbName = Some(table.getDatabaseName),
           tableName = table.getTableName,
@@ -167,7 +174,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
           compactionType = "", // to trigger index merge, this is not required
           factTimeStamp = Some(System.currentTimeMillis()),
           alterSql = null,
-          customSegmentIds = Some(Seq(loadModel.getSegmentId).toList))
+          customSegmentIds = customSegmentIds)
         val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, 
alterTableModel)
         OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new 
OperationContext)
       }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index bb2c63f..6a079b5 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -426,14 +426,58 @@ class CarbonIndexFileMergeTestCase
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable 
OPTIONS('header'='false')")
-    assert(getIndexFileCount("default_streamingTable", "0") >= 1)
-    val exceptionMessage = intercept[Exception] {
-      sql("alter table streamingTable compact 'segment_index'")
-    }.getMessage
-    assert(exceptionMessage.contains("Unsupported alter operation on carbon 
table: Merge index is not supported on streaming table"))
+    // check for one merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
     sql("DROP TABLE IF EXISTS streamingTable")
   }
 
+  test("Verify alter table index merge for streaming table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS streamingTable")
+    sql(
+      """
+        | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age 
INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable 
OPTIONS('header'='false')")
+    // check for zero merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+    // check for one index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.INDEX_FILE_EXT) == 1)
+    sql("alter table streamingTable compact 'segment_index'")
+    sql("alter table streamingTable compact 'segment_index' where segment.id 
in (0)")
+    // check for one merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+    sql("DROP TABLE IF EXISTS streamingTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+  }
+
+  test("Verify alter table index merge for streaming table with custom 
segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS streamingTable")
+    sql(
+      """
+        | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age 
INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable 
OPTIONS('header'='false')")
+    // check for zero merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+    // check for one index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.INDEX_FILE_EXT) == 1)
+    sql("alter table streamingTable compact 'segment_index' where segment.id 
in (0)")
+    // check for one merge index file
+    assert(getIndexFileCount("default_streamingTable", "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+    sql("DROP TABLE IF EXISTS streamingTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+  }
+
   test("verify driver cache gets updated after creating merge Index file") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
@@ -471,7 +515,9 @@ class CarbonIndexFileMergeTestCase
     identifiers.forall(identifier => identifier.getMergeIndexFileName == null)
   }
 
-  private def getIndexFileCount(tableName: String, segment: String): Int = {
+  private def getIndexFileCount(tableName: String,
+      segment: String,
+      extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = {
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath
       .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
@@ -479,15 +525,13 @@ class CarbonIndexFileMergeTestCase
       FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
         .listFiles(true, new CarbonFileFilter {
           override def accept(file: CarbonFile): Boolean = {
-            file.getName.endsWith(CarbonTablePath
-              .INDEX_FILE_EXT)
+            file.getName.endsWith(extension)
           }
         })
     } else {
       FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
         override def accept(file: CarbonFile): Boolean = {
-          file.getName.endsWith(CarbonTablePath
-            .INDEX_FILE_EXT)
+          file.getName.endsWith(extension)
         }
       })
     }

Reply via email to