This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 1a0ed65 [CARBONDATA-3843] Support merging index for streaming table 1a0ed65 is described below commit 1a0ed65270acc6694ed52b13aaddf55bfcfe0422 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Tue Jun 2 20:00:38 2020 +0530 [CARBONDATA-3843] Support merging index for streaming table Why is this PR needed? Merge index is not created for normal segment (created by load, insert, compaction or handoff) on streaming table. What changes were proposed in this PR? For a streaming table other than streaming segment (Row_V1), allow merge index creation for all kinds of segments. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3785 --- docs/ddl-of-carbondata.md | 3 +- .../spark/sql/events/MergeIndexEventListener.scala | 134 +++++++++++---------- .../CarbonAlterTableCompactionCommand.scala | 5 - .../CarbonAlterTableAddHivePartitionCommand.scala | 11 +- .../CarbonIndexFileMergeTestCase.scala | 64 ++++++++-- 5 files changed, 134 insertions(+), 83 deletions(-) diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md index 3165f4e..e7cfb0c 100644 --- a/docs/ddl-of-carbondata.md +++ b/docs/ddl-of-carbondata.md @@ -751,8 +751,9 @@ Users can specify which columns to include and exclude for local dictionary gene ``` **NOTE:** + * Merge index is supported on streaming table from carbondata 2.0.1 version. + But streaming segments (ROW_V1) cannot create merge index. - * Merge index is not supported on streaming table. - #### SET and UNSET diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index 2995edc..4e06ff0 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -30,9 +30,8 @@ import org.apache.spark.util.MergeIndexUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.index.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatusManager} import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil} import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent @@ -62,7 +61,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { .asInstanceOf[util.List[String]] } val tempPath = operationContext.getProperty("tempPath") - if(!carbonTable.isStreamingSink) { + val loadMetaDetails = loadModel.getCurrentLoadMetadataDetail + if (loadMetaDetails != null && !loadMetaDetails.getFileFormat.equals(FileFormat.ROW_V1)) { if (null != compactedSegments && !compactedSegments.isEmpty) { MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, @@ -104,73 +104,77 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName}") + val loadFolderDetailsArray = SegmentStatusManager + .readLoadMetadata(carbonMainTable.getMetadataPath) + val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, + String]() + var streamingSegment: Set[String] = Set[String]() + loadFolderDetailsArray.foreach(loadMetadataDetails => { + if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { + streamingSegment += loadMetadataDetails.getLoadName + } + segmentFileNameMap + .put(loadMetadataDetails.getLoadName, + String.valueOf(loadMetadataDetails.getLoadStartTime)) + }) + val segmentsToMerge = + if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { + val validSegments = + CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala + val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() + validSegments.foreach { segment => + // do not add ROW_V1 format + if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { validSegmentIds += segment.getSegmentNo } - validSegmentIds - } else { - alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get } - - val loadFolderDetailsArray = SegmentStatusManager - .readLoadMetadata(carbonMainTable.getMetadataPath) - val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, - String]() - loadFolderDetailsArray.foreach(loadMetadataDetails => { - segmentFileNameMap - .put(loadMetadataDetails.getLoadName, - String.valueOf(loadMetadataDetails.getLoadStartTime)) - }) - // in case of merge index file creation using Alter DDL command - // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy - // store (store <= 1.1 version) and create merge Index file as per new store so that - // old store is also upgraded to new store - val startTime = System.currentTimeMillis() - CarbonMergeFilesRDD.mergeIndexFiles( - sparkSession = sparkSession, - segmentIds = segmentsToMerge, - segmentFileNameToSegmentIdMap = segmentFileNameMap, - tablePath = carbonMainTable.getTablePath, - carbonTable = carbonMainTable, - mergeIndexProperty = true, - readFileFooterFromCarbonDataFile = true) - LOGGER.info("Total time taken for merge index " - + (System.currentTimeMillis() - startTime) + "ms") - // clear Block index Cache - MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge) - val requestMessage = "Compaction request completed for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" - LOGGER.info(requestMessage) - } else { - val lockMessage = "Not able to acquire the compaction lock for table " + - s"${ carbonMainTable.getDatabaseName }." + - s"${ carbonMainTable.getTableName}" - LOGGER.error(lockMessage) - CarbonException.analysisException( - "Table is already locked for compaction. Please try after some time.") - } - } finally { - lock.unlock() + validSegmentIds + } else { + alterTableMergeIndexEvent.alterTableModel + .customSegmentIds + .get + .filterNot(streamingSegment.contains(_)) + } + // in case of merge index file creation using Alter DDL command + // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy + // store (store <= 1.1 version) and create merge Index file as per new store so that + // old store is also upgraded to new store + val startTime = System.currentTimeMillis() + CarbonMergeFilesRDD.mergeIndexFiles( + sparkSession = sparkSession, + segmentIds = segmentsToMerge, + segmentFileNameToSegmentIdMap = segmentFileNameMap, + tablePath = carbonMainTable.getTablePath, + carbonTable = carbonMainTable, + mergeIndexProperty = true, + readFileFooterFromCarbonDataFile = true) + LOGGER.info("Total time taken for merge index " + + (System.currentTimeMillis() - startTime) + "ms") + // clear Block index Cache + MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge) + val requestMessage = "Compaction request completed for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" + LOGGER.info(requestMessage) + } else { + val lockMessage = "Not able to acquire the compaction lock for table " + + s"${ carbonMainTable.getDatabaseName }." + + s"${ carbonMainTable.getTableName}" + LOGGER.error(lockMessage) + CarbonException.analysisException( + "Table is already locked for compaction. Please try after some time.") } + } finally { + lock.unlock() } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 2224943..dc50cf5 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -131,11 +131,6 @@ case class CarbonAlterTableCompactionCommand( } Seq.empty } else if (compactionType == CompactionType.SEGMENT_INDEX) { - if (table.isStreamingSink) { - throw new MalformedCarbonCommandException( - "Unsupported alter operation on carbon table: Merge index is not supported on streaming" + - " table") - } val version = CarbonUtil.getFormatVersion(table) val isOlderVersion = version == ColumnarFormatVersion.V1 || version == ColumnarFormatVersion.V2 diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index a080db6..09614a8 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -33,7 +33,7 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.SegmentStatus +import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableMergeIndexEvent, OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent} @@ -160,6 +160,13 @@ case class CarbonAlterTableAddHivePartitionCommand( // carbon index files, and it is not good for query performance since all index files // need to be read to spark driver. // So, here trigger to merge the index files by sending an event + val customSegmentIds = if (loadModel.getCurrentLoadMetadataDetail + .getFileFormat + .equals(FileFormat.ROW_V1)) { + Some(Seq("").toList) + } else { + Some(Seq(loadModel.getSegmentId).toList) + } val alterTableModel = AlterTableModel( dbName = Some(table.getDatabaseName), tableName = table.getTableName, @@ -167,7 +174,7 @@ case class CarbonAlterTableAddHivePartitionCommand( compactionType = "", // to trigger index merge, this is not required factTimeStamp = Some(System.currentTimeMillis()), alterSql = null, - customSegmentIds = Some(Seq(loadModel.getSegmentId).toList)) + customSegmentIds = customSegmentIds) val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel) OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext) } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index bb2c63f..6a079b5 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -426,14 +426,58 @@ class CarbonIndexFileMergeTestCase | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true') """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')") - assert(getIndexFileCount("default_streamingTable", "0") >= 1) - val exceptionMessage = intercept[Exception] { - sql("alter table streamingTable compact 'segment_index'") - }.getMessage - assert(exceptionMessage.contains("Unsupported alter operation on carbon table: Merge index is not supported on streaming table")) + // check for one merge index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1) sql("DROP TABLE IF EXISTS streamingTable") } + test("Verify alter table index merge for streaming table") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") + sql("DROP TABLE IF EXISTS streamingTable") + sql( + """ + | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age INT) + | STORED AS carbondata + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')") + // check for zero merge index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0) + // check for one index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT) == 1) + sql("alter table streamingTable compact 'segment_index'") + sql("alter table streamingTable compact 'segment_index' where segment.id in (0)") + // check for one merge index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1) + sql("DROP TABLE IF EXISTS streamingTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") + } + + test("Verify alter table index merge for streaming table with custom segment") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") + sql("DROP TABLE IF EXISTS streamingTable") + sql( + """ + | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age INT) + | STORED AS carbondata + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')") + // check for zero merge index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0) + // check for one index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT) == 1) + sql("alter table streamingTable compact 'segment_index' where segment.id in (0)") + // check for one merge index file + assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1) + sql("DROP TABLE IF EXISTS streamingTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") + } + test("verify driver cache gets updated after creating merge Index file") { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") @@ -471,7 +515,9 @@ class CarbonIndexFileMergeTestCase identifiers.forall(identifier => identifier.getMergeIndexFileName == null) } - private def getIndexFileCount(tableName: String, segment: String): Int = { + private def getIndexFileCount(tableName: String, + segment: String, + extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = { val table = CarbonMetadata.getInstance().getCarbonTable(tableName) val path = CarbonTablePath .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment) @@ -479,15 +525,13 @@ class CarbonIndexFileMergeTestCase FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath) .listFiles(true, new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = { - file.getName.endsWith(CarbonTablePath - .INDEX_FILE_EXT) + file.getName.endsWith(extension) } }) } else { FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = { - file.getName.endsWith(CarbonTablePath - .INDEX_FILE_EXT) + file.getName.endsWith(extension) } }) }