QiangCai commented on a change in pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#discussion_r456179554



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if 
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${
+                          carbonMainTable
+                            .getTableName
+                        }")

Review comment:
       combine these lines to one line

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
-        if (!carbonMainTable.isStreamingSink) {
-          LOGGER.info(s"Merge Index request received for table " +
-                      s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
-          val lock = CarbonLockFactory.getCarbonLockObj(
-            carbonMainTable.getAbsoluteTableIdentifier,
-            LockUsage.COMPACTION_LOCK)
+        LOGGER.info(s"Merge Index request received for table " +
+                    s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }")
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonMainTable.getAbsoluteTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
 
-          try {
-            if (lock.lockWithRetries()) {
-              LOGGER.info("Acquired the compaction lock for table" +
-                          s" ${ carbonMainTable.getDatabaseName }.${
-                            carbonMainTable
-                              .getTableName
-                          }")
-              val segmentsToMerge =
-                if 
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                  val validSegments =
-                    
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
-                  val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
-                  validSegments.foreach { segment =>
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the compaction lock for table" +
+                        s" ${ carbonMainTable.getDatabaseName }.${
+                          carbonMainTable
+                            .getTableName
+                        }")
+            val loadFolderDetailsArray = SegmentStatusManager
+              .readLoadMetadata(carbonMainTable.getMetadataPath)
+            val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String,
+              String]()
+            var streamingSegment: Set[String] = Set[String]()
+            loadFolderDetailsArray.foreach(loadMetadataDetails => {
+              if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) 
{
+                streamingSegment += loadMetadataDetails.getLoadName
+              }
+              segmentFileNameMap
+                .put(loadMetadataDetails.getLoadName,
+                  String.valueOf(loadMetadataDetails.getLoadStartTime))
+            })
+            val segmentsToMerge =
+              if 
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+                val validSegments =
+                  
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+                val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
+                validSegments.foreach { segment =>
+                  // do not add ROW_V1 format
+                  if 
(!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
                     validSegmentIds += segment.getSegmentNo
                   }
-                  validSegmentIds
-                } else {
-                  
alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
                 }
-
-              val loadFolderDetailsArray = SegmentStatusManager
-                .readLoadMetadata(carbonMainTable.getMetadataPath)
-              val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String,
-                String]()
-              loadFolderDetailsArray.foreach(loadMetadataDetails => {
-                segmentFileNameMap
-                  .put(loadMetadataDetails.getLoadName,
-                    String.valueOf(loadMetadataDetails.getLoadStartTime))
-              })
-              // in case of merge index file creation using Alter DDL command
-              // readFileFooterFromCarbonDataFile flag should be true. This 
flag is check for legacy
-              // store (store <= 1.1 version) and create merge Index file as 
per new store so that
-              // old store is also upgraded to new store
-              val startTime = System.currentTimeMillis()
-              CarbonMergeFilesRDD.mergeIndexFiles(
-                sparkSession = sparkSession,
-                segmentIds = segmentsToMerge,
-                segmentFileNameToSegmentIdMap = segmentFileNameMap,
-                tablePath = carbonMainTable.getTablePath,
-                carbonTable = carbonMainTable,
-                mergeIndexProperty = true,
-                readFileFooterFromCarbonDataFile = true)
-              LOGGER.info("Total time taken for merge index "
-                          + (System.currentTimeMillis() - startTime) + "ms")
-              // clear Block index Cache
-              MergeIndexUtil.clearBlockIndexCache(carbonMainTable, 
segmentsToMerge)
-              val requestMessage = "Compaction request completed for table " +
-                s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }"
-              LOGGER.info(requestMessage)
-            } else {
-              val lockMessage = "Not able to acquire the compaction lock for 
table " +
-                                s"${ carbonMainTable.getDatabaseName }." +
-                                s"${ carbonMainTable.getTableName}"
-              LOGGER.error(lockMessage)
-              CarbonException.analysisException(
-                "Table is already locked for compaction. Please try after some 
time.")
-            }
-          } finally {
-            lock.unlock()
+                validSegmentIds
+              } else {
+                alterTableMergeIndexEvent.alterTableModel
+                  .customSegmentIds
+                  .get
+                  .filter(!streamingSegment.contains(_))

Review comment:
       .filterNot(streamingSegment.contains(_))

##########
File path: docs/ddl-of-carbondata.md
##########
@@ -750,10 +750,6 @@ Users can specify which columns to include and exclude for 
local dictionary gene
      ALTER TABLE test_db.carbon COMPACT 'SEGMENT_INDEX'
      ```
 
-     **NOTE:**
-
-     * Merge index is not supported on streaming table.

Review comment:
       Merge index is supported on the streaming table since 2.1. But the 
streaming segments don't merge index still.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to