Repository: carbondata Updated Branches: refs/heads/master 4612e0031 -> a26be1b18
[CARBONDATA-2714][Merge Index] Fixed block dataMap cache refresh issue after creation of merge index file Things handled as part of this PR Fixed block dataMap cache refresh issue after creation of merge index file Problem: Block DataMap cache not getting refreshed after creation of merge index file due to which queries still look for index file and fail. Analysis: Merge index file creation involves modification of segment file. If a query is executed without merge Index file creation then cache will be loaded. Once merge Index file is created the index file entries will be removed from segment file and merge index file entry will be added. In this process the cache is not getting refreshed and the tableIdentifiers created still have the mergeIndexFIleName as null. Fix: After updating table status file clear the dataMap cache for all segmentId's on which dataMap is being created This closes #2515 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a26be1b1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a26be1b1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a26be1b1 Branch: refs/heads/master Commit: a26be1b181f952d860050e65b6cf1ad85d0bfea5 Parents: 4612e00 Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Tue Jul 17 14:24:54 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Jul 18 10:30:33 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/SegmentFileStore.java | 29 +++++++++++- .../core/writer/CarbonIndexFileMergeWriter.java | 2 +- .../CarbonIndexFileMergeTestCase.scala | 47 +++++++++++++++++++- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../sql/events/MergeIndexEventListener.scala | 17 ++++++- 5 files changed, 91 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 3d08a2d..9681e37 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -36,7 +36,9 @@ import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -280,9 +282,10 @@ public class SegmentFileStore { * @return boolean which determines whether status update is done or not. * @throws IOException */ - public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile, - String tableId, SegmentFileStore segmentFileStore) throws IOException { + public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentId, + String segmentFile, String tableId, SegmentFileStore segmentFileStore) throws IOException { boolean status = false; + String tablePath = carbonTable.getTablePath(); String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); if (!FileFactory.isFileExist(tableStatusPath)) { return status; @@ -316,6 +319,8 @@ public class SegmentFileStore { SegmentStatusManager .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + // clear dataMap cache for the segmentId for which the table status file is getting updated + clearBlockDataMapCache(carbonTable, segmentId); status = true; } else { LOGGER.error( @@ -333,6 +338,26 @@ public class SegmentFileStore { return status; } + /** + * After updating table status file clear the dataMap cache for all segmentId's on which + * dataMap is being created because flows like merge index file creation involves modification of + * segment file and once segment file is modified the cache for that segment need to be cleared + * otherwise the old cache will be used which is stale + * + * @param carbonTable + * @param segmentId + */ + public static void clearBlockDataMapCache(CarbonTable carbonTable, String segmentId) { + TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable); + Segment segment = new Segment(segmentId); + List<Segment> segments = new ArrayList<>(); + segments.add(segment); + LOGGER.info( + "clearing cache while updating segment file entry in table status file for segmentId: " + + segmentId); + defaultDataMap.clear(segments); + } + private static CarbonFile[] getSegmentFiles(String segmentPath) { CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); if (carbonFile.exists()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index b080f52..1634091 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -151,7 +151,7 @@ public class CarbonIndexFileMergeWriter { String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); - SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName, + SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName, table.getCarbonTableIdentifier().getTableId(), segmentFileStore); for (CarbonFile file : indexFiles) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index 8ee2275..173c14f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -17,17 +17,24 @@ package org.apache.carbondata.spark.testsuite.datacompaction +import java.util + +import scala.collection.JavaConverters._ + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.spark.sql.Row +import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest import org.junit.Assert import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager @@ -44,6 +51,7 @@ class CarbonIndexFileMergeTestCase CompactionSupportGlobalSortBigFileTest.deleteFile(file2) sql("DROP TABLE IF EXISTS nonindexmerge") sql("DROP TABLE IF EXISTS indexmerge") + sql("DROP TABLE IF EXISTS merge_index_cache") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT) @@ -444,6 +452,43 @@ class CarbonIndexFileMergeTestCase sql("DROP TABLE IF EXISTS streamingTable") } + test("verify driver cache gets updated after creating merge Index file") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") + sql("DROP TABLE IF EXISTS merge_index_cache") + sql( + """ + | CREATE TABLE merge_index_cache(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE merge_index_cache OPTIONS('header'='false')") + sql("""Select count(*) from merge_index_cache""").collect() + // merge Index fileName should be null as merge Index file is not created + assert(mergeFileNameIsNull("0", "default", "merge_index_cache")) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") + sql("ALTER TABLE merge_index_cache COMPACT 'SEGMENT_INDEX'") + sql("""Select count(*) from merge_index_cache""").collect() + // once merge file is created cache should be refreshed in the same session and identifiers + // should contain mergeIndex file name + assert(!mergeFileNameIsNull("0", "default", "merge_index_cache")) + } + + private def mergeFileNameIsNull(segmentId: String, dbName: String, tableName: String): Boolean = { + val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession) + val dataMapFactory = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable) + .getDataMapFactory + val method = classOf[BlockletDataMapFactory] + .getDeclaredMethod("getTableBlockIndexUniqueIdentifiers", classOf[Segment]) + method.setAccessible(true) + val segment = new Segment(segmentId) + val identifiers = method.invoke(dataMapFactory, segment) + .asInstanceOf[util.Set[TableBlockIndexUniqueIdentifier]].asScala + assert(identifiers.size == 1) + identifiers.forall(identifier => identifier.getMergeIndexFileName == null) + } + private def getIndexFileCount(tableName: String, segment: String): Int = { val table = CarbonMetadata.getInstance().getCarbonTable(tableName) val path = CarbonTablePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 40d5c0d..804193c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -519,7 +519,7 @@ object CarbonDataRDDFactory { String.valueOf(carbonLoadModel.getFactTimeStamp)) SegmentFileStore.updateSegmentFile( - carbonTable.getTablePath, + carbonTable, carbonLoadModel.getSegmentId, segmentFileName, carbonTable.getCarbonTableIdentifier.getTableId, http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index a58e405..dd64423 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -31,6 +31,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener} @@ -67,6 +68,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { segmentFileNameMap, carbonTable.getTablePath, carbonTable, false) + // clear Block dataMap Cache + clearBlockDataMapCache(carbonTable, Seq(loadModel.getSegmentId)) } } case alterTableCompactionPostEvent: AlterTableCompactionPostEvent => @@ -120,6 +123,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { carbonMainTable.getTablePath, carbonMainTable, true) + // clear Block dataMap Cache + clearBlockDataMapCache(carbonMainTable, validSegmentIds) val requestMessage = "Compaction request completed for table " s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" LOGGER.audit(requestMessage) @@ -168,13 +173,23 @@ class MergeIndexEventListener extends OperationEventListener with Logging { // So, it is enough to do merge index only for 0.2 as it is the only valid segment in this list val validMergedSegIds = validSegments .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo) - if (null != validMergedSegIds && !mergedSegmentIds.isEmpty) { + if (null != validMergedSegIds && !validMergedSegIds.isEmpty) { CommonUtil.mergeIndexFiles(sparkContext, validMergedSegIds, segmentFileNameMap, carbonTable.getTablePath, carbonTable, false) + // clear Block dataMap Cache + clearBlockDataMapCache(carbonTable, validMergedSegIds) } } + + private def clearBlockDataMapCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = { + // clear driver Block dataMap cache for each segment + segmentIds.foreach { segmentId => + SegmentFileStore.clearBlockDataMapCache(carbonTable, segmentId) + } + } + }