[CARBONDATA-2482] Pass uuid while writing segment file if possible Pass the uuid (segmentFileName) to the writeSegmentFile method file if possible.
Problem: When the supporting tables depends on the segmentFileName of the main table, query is failing, as it is expected to be the same name as that of the main table. Mostly in case of merge index, segmentFile will be rewritten for that segment. Solution: Whenever the supporting table segmentFileName should be same as that main table, in that case we should pass the name as the UUID in the merge index flow instead of taking a new timestamp. This closes #2307 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6c5abddf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6c5abddf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6c5abddf Branch: refs/heads/carbonstore Commit: 6c5abddfb125b2dd51c9a989f003fe2bdc066d6d Parents: eb604fd Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Tue May 15 12:24:01 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon Jul 16 10:34:31 2018 +0530 ---------------------------------------------------------------------- .../core/datamap/DataMapStoreManager.java | 3 +- .../core/writer/CarbonIndexFileMergeWriter.java | 40 ++++++-------------- .../sdv/generated/MergeIndexTestCase.scala | 8 ++-- .../CarbonIndexFileMergeTestCase.scala | 18 ++++----- .../TestStreamingTableWithRowParser.scala | 2 +- .../store/CarbonFactDataHandlerModel.java | 3 +- .../processing/util/CarbonLoaderUtil.java | 10 +++-- 7 files changed, 35 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 8ce302b..475ec01 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -584,7 +584,8 @@ public final class DataMapStoreManager { SegmentRefreshInfo segmentRefreshInfo = seg.getSegmentRefreshInfo(updateVo); String segmentId = seg.getSegmentNo(); - if (segmentRefreshTime.get(segmentId) == null) { + if (segmentRefreshTime.get(segmentId) == null + && segmentRefreshInfo.getSegmentUpdatedTimestamp() != null) { segmentRefreshTime.put(segmentId, segmentRefreshInfo); return true; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index 80a46cb..b080f52 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -67,7 +67,7 @@ public class CarbonIndexFileMergeWriter { */ private String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath, List<String> indexFileNamesTobeAdded, - boolean readFileFooterFromCarbonDataFile) throws IOException { + boolean readFileFooterFromCarbonDataFile, String uuid) throws IOException { Segment segment = Segment.getSegment(segmentId, tablePath); String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId); CarbonFile[] indexFiles; @@ -85,7 +85,7 @@ public class CarbonIndexFileMergeWriter { readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId); } else { return writeMergeIndexFileBasedOnSegmentFile( - segmentId, indexFileNamesTobeAdded, sfs, indexFiles); + segmentId, indexFileNamesTobeAdded, sfs, indexFiles, uuid); } } return null; @@ -111,10 +111,9 @@ public class CarbonIndexFileMergeWriter { return null; } - private String writeMergeIndexFileBasedOnSegmentFile( - String segmentId, - List<String> indexFileNamesTobeAdded, - SegmentFileStore segmentFileStore, CarbonFile[] indexFiles) throws IOException { + private String writeMergeIndexFileBasedOnSegmentFile(String segmentId, + List<String> indexFileNamesTobeAdded, SegmentFileStore segmentFileStore, + CarbonFile[] indexFiles, String uuid) throws IOException { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); fileStore .readAllIIndexOfSegment(segmentFileStore.getSegmentFile(), segmentFileStore.getTablePath(), @@ -147,11 +146,8 @@ public class CarbonIndexFileMergeWriter { } } } - - String uniqueId = String.valueOf(System.currentTimeMillis()); - String newSegmentFileName = - SegmentFileStore.genSegmentFileName(segmentId, String.valueOf(uniqueId)) - + CarbonTablePath.SEGMENT_EXT; + String newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid) + + CarbonTablePath.SEGMENT_EXT; String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); @@ -162,7 +158,7 @@ public class CarbonIndexFileMergeWriter { file.delete(); } - return uniqueId; + return uuid; } private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath, @@ -196,23 +192,11 @@ public class CarbonIndexFileMergeWriter { * Merge all the carbonindex files of segment to a merged file * * @param segmentId - * @param indexFileNamesTobeAdded * @throws IOException */ - public String mergeCarbonIndexFilesOfSegment(String segmentId, - String tablePath, List<String> indexFileNamesTobeAdded) throws IOException { - return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false); - } - - /** - * Merge all the carbonindex files of segment to a merged file - * - * @param segmentId - * @throws IOException - */ - public String mergeCarbonIndexFilesOfSegment(String segmentId, + public String mergeCarbonIndexFilesOfSegment(String segmentId, String uuid, String tablePath) throws IOException { - return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false); + return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false, uuid); } /** @@ -223,9 +207,9 @@ public class CarbonIndexFileMergeWriter { * @throws IOException */ public String mergeCarbonIndexFilesOfSegment(String segmentId, - String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException { + String tablePath, boolean readFileFooterFromCarbonDataFile, String uuid) throws IOException { return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, - readFileFooterFromCarbonDataFile); + readFileFooterFromCarbonDataFile, uuid); } private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala index 99a537a..215ad0d 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala @@ -56,7 +56,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge") - new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), sql("""Select count(*) from carbon_automation_merge""")) @@ -73,8 +73,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 1) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") >= 1) val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge") - new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) - new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows) @@ -95,7 +95,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { assert(getIndexFileCount("default", "carbon_automation_nonmerge", "2") >= 1) sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect() val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge") - new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1", true) >= 1) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index 51e46f7..5871262 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -71,7 +71,7 @@ class CarbonIndexFileMergeTestCase s"'GLOBAL_SORT_PARTITIONS'='100')") val table = CarbonMetadata.getInstance().getCarbonTable("default", "indexmerge") new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default_indexmerge", "0") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), sql("""Select count(*) from indexmerge""")) @@ -94,9 +94,9 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "1") == 100) val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) @@ -119,9 +119,9 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "1") == 100) val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) @@ -148,7 +148,7 @@ class CarbonIndexFileMergeTestCase sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) } @@ -177,7 +177,7 @@ class CarbonIndexFileMergeTestCase sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge") new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) assert(getIndexFileCount("default_nonindexmerge", "2") == 100) @@ -192,7 +192,7 @@ class CarbonIndexFileMergeTestCase sql("insert into table mitable select '1','2000-02-01'") val table = CarbonMetadata.getInstance().getCarbonTable("default", "mitable") new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) sql("update mitable set(id)=(2) where issue = '2000-02-01'").show() sql("clean files for table mitable") sql("select * from mitable").show() @@ -216,7 +216,7 @@ class CarbonIndexFileMergeTestCase .assertEquals(getIndexOrMergeIndexFileSize(table, "0", CarbonTablePath.INDEX_FILE_EXT), segment0.head.getIndexSize.toLong) new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis())) loadMetadataDetails = SegmentStatusManager .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath)) segment0 = loadMetadataDetails.filter(x=> x.getLoadName.equalsIgnoreCase("0")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index 410853b..ae99800 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -426,7 +426,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { val carbonTable: CarbonTable = CarbonMetadata.getInstance .getCarbonTable("streaming1", "stream_table_with_mi") new CarbonIndexFileMergeWriter(carbonTable) - .mergeCarbonIndexFilesOfSegment("1", carbonTable.getTablePath, false) + .mergeCarbonIndexFilesOfSegment("1", carbonTable.getTablePath, false, String.valueOf(System.currentTimeMillis())) // non-filter val result = sql("select * from streaming1.stream_table_with_mi order by id, name").collect() assert(result != null) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index ca75b8c..3e70fc1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -328,8 +328,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality()); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); - carbonFactDataHandlerModel.tableSpec = - new TableSpec(loadModel.getCarbonDataLoadSchema().getCarbonTable()); + carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable); DataMapWriterListener listener = new DataMapWriterListener(); listener.registerAllWriter( carbonTable, http://git-wip-us.apache.org/repos/asf/carbondata/blob/6c5abddf/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index da77cf6..8a0c2b6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -1167,16 +1167,18 @@ public final class CarbonLoaderUtil { /** * Merge index files with in the segment of partitioned table - * @param segmentId + * * @param table + * @param segmentId + * @param uuid * @return * @throws IOException */ - public static String mergeIndexFilesinPartitionedSegment(String segmentId, CarbonTable table) - throws IOException { + public static String mergeIndexFilesinPartitionedSegment(CarbonTable table, String segmentId, + String uuid) throws IOException { String tablePath = table.getTablePath(); return new CarbonIndexFileMergeWriter(table) - .mergeCarbonIndexFilesOfSegment(segmentId, tablePath); + .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath); } private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {