[CARBONDATA-2209] Fixed rename table with partitions not working issue and batch_sort and no_sort with partition table issue
This closes #2006 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5b44e810 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5b44e810 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5b44e810 Branch: refs/heads/branch-1.3 Commit: 5b44e8105cc10ea9616323bbe3736619729658ae Parents: 092b5d5 Author: ravipesala <ravi.pes...@gmail.com> Authored: Tue Feb 27 16:38:09 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Fri Mar 2 21:26:30 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/core/datamap/Segment.java | 21 ++ .../blockletindex/SegmentIndexFileStore.java | 13 +- .../core/metadata/SegmentFileStore.java | 33 +++- .../core/util/path/CarbonTablePath.java | 7 + .../core/writer/CarbonIndexFileMergeWriter.java | 191 ++++++++++++++----- .../CarbonIndexFileMergeTestCase.scala | 15 +- .../StandardPartitionGlobalSortTestCase.scala | 86 +++++++++ .../StandardPartitionTableQueryTestCase.scala | 21 ++ .../schema/CarbonAlterTableRenameCommand.scala | 58 +++++- .../sql/execution/strategy/DDLStrategy.scala | 8 + .../spark/sql/hive/CarbonSessionState.scala | 12 +- .../spark/sql/hive/CarbonSessionState.scala | 10 + .../loading/DataLoadProcessBuilder.java | 8 +- .../processing/util/CarbonLoaderUtil.java | 87 +++++++++ 14 files changed, 509 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index c47f16c..a2a2a41 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -21,6 +21,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; + /** * Represents one load of carbondata */ @@ -76,6 +80,23 @@ public class Segment implements Serializable { return new Segment(segmentId, null); } + /** + * Read the table status and get the segment corresponding to segmentNo + * @param segmentNo + * @param tablePath + * @return + */ + public static Segment getSegment(String segmentNo, String tablePath) { + LoadMetadataDetails[] loadMetadataDetails = + SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath)); + for (LoadMetadataDetails details: loadMetadataDetails) { + if (details.getLoadName().equals(segmentNo)) { + return new Segment(details.getLoadName(), details.getSegmentFile()); + } + } + return null; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 4883d94..9364a7a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -96,6 +96,7 @@ public class SegmentIndexFileStore { public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath, SegmentStatus status, boolean ignoreStatus) throws IOException { List<CarbonFile> carbonIndexFiles = new ArrayList<>(); + Set<String> indexFiles = new HashSet<>(); if (segmentFile == null) { return; } @@ -107,11 +108,21 @@ public class SegmentIndexFileStore { if (locations.getValue().isRelative()) { location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; } + String mergeFileName = locations.getValue().getMergeFileName(); + if (mergeFileName != null) { + CarbonFile mergeFile = FileFactory + .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName); + if (mergeFile.exists() && !indexFiles.contains(mergeFile.getAbsolutePath())) { + carbonIndexFiles.add(mergeFile); + indexFiles.add(mergeFile.getAbsolutePath()); + } + } for (String indexFile : locations.getValue().getFiles()) { CarbonFile carbonFile = FileFactory .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile); - if (carbonFile.exists()) { + if (carbonFile.exists() && !indexFiles.contains(carbonFile.getAbsolutePath())) { carbonIndexFiles.add(carbonFile); + indexFiles.add(carbonFile.getAbsolutePath()); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index f2548b5..2d31b4e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -195,7 +195,7 @@ public class SegmentFileStore { * @param partitionSpecs */ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, - List<PartitionSpec> partitionSpecs) { + List<PartitionSpec> partitionSpecs) throws IOException { SegmentFile segmentFile = null; for (PartitionSpec spec : partitionSpecs) { String location = spec.getLocation().toString(); @@ -220,6 +220,9 @@ public class SegmentFileStore { folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); for (CarbonFile file : listFiles) { if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + List<String> indexFiles = + new SegmentIndexFileStore().getIndexFilesFromMergeFile(file.getAbsolutePath()); + folderDetails.getFiles().addAll(indexFiles); folderDetails.setMergeFileName(file.getName()); } else { folderDetails.getFiles().add(file.getName()); @@ -302,6 +305,10 @@ public class SegmentFileStore { readIndexFiles(SegmentStatus.SUCCESS, false); } + public SegmentFile getSegmentFile() { + return segmentFile; + } + /** * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just * reads all index files @@ -377,6 +384,30 @@ public class SegmentFileStore { } /** + * Gets all carbon index files from this segment + * @return + */ + public List<CarbonFile> getIndexCarbonFiles() { + Map<String, String> indexFiles = getIndexFiles(); + Set<String> files = new HashSet<>(); + for (Map.Entry<String, String> entry: indexFiles.entrySet()) { + Path path = new Path(entry.getKey()); + files.add(entry.getKey()); + if (entry.getValue() != null) { + files.add(new Path(path.getParent(), entry.getValue()).toString()); + } + } + List<CarbonFile> carbonFiles = new ArrayList<>(); + for (String indexFile : files) { + CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile); + if (carbonFile.exists()) { + carbonFiles.add(carbonFile); + } + } + return carbonFiles; + } + + /** * Drops the partition related files from the segment file of the segment and writes * to a new file. First iterator over segment file and check the path it needs to be dropped. * And update the status with delete if it found. http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index d70d9ef..f232c23 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -165,6 +165,13 @@ public class CarbonTablePath extends Path { } /** + * Return metadata path based on `tablePath` + */ + public static String getTableStatusPath(String tablePath) { + return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE; + } + + /** * @param columnId unique column identifier * @return absolute path of dictionary meta file */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index 01f96ba..bc150e5 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -17,18 +17,26 @@ package org.apache.carbondata.core.writer; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.MergedBlockIndex; import org.apache.carbondata.format.MergedBlockIndexHeader; +import org.apache.hadoop.fs.Path; + public class CarbonIndexFileMergeWriter { /** @@ -38,7 +46,7 @@ public class CarbonIndexFileMergeWriter { /** * Merge all the carbonindex files of segment to a merged file - * @param segmentPath + * @param tablePath * @param indexFileNamesTobeAdded while merging it comsiders only these files. * If null then consider all * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata @@ -46,77 +54,152 @@ public class CarbonIndexFileMergeWriter { * which do not store the blocklet info to current version * @throws IOException */ - private void mergeCarbonIndexFilesOfSegment(String segmentPath, - List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile) - throws IOException { - CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + String tablePath, List<String> indexFileNamesTobeAdded, + boolean readFileFooterFromCarbonDataFile) throws IOException { + Segment segment = Segment.getSegment(segmentId, tablePath); + String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId); + CarbonFile[] indexFiles; + SegmentFileStore sfs = null; + if (segment != null && segment.getSegmentFileName() != null) { + sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName()); + List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles(); + indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]); + } else { + indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + } if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) { - SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - if (readFileFooterFromCarbonDataFile) { - // this case will be used in case of upgrade where old store will not have the blocklet - // info in the index file and therefore blocklet info need to be read from the file footer - // in the carbondata file - fileStore.readAllIndexAndFillBolckletInfo(segmentPath); + if (sfs == null) { + return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile, + segmentPath, indexFiles); } else { - fileStore.readAllIIndexOfSegment(segmentPath); + return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles); } - Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap(); - MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader(); - MergedBlockIndex mergedBlockIndex = new MergedBlockIndex(); - List<String> fileNames = new ArrayList<>(indexMap.size()); - List<ByteBuffer> data = new ArrayList<>(indexMap.size()); - for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) { - if (indexFileNamesTobeAdded == null || - indexFileNamesTobeAdded.contains(entry.getKey())) { - fileNames.add(entry.getKey()); - data.add(ByteBuffer.wrap(entry.getValue())); - } + } + return null; + } + + + private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded, + boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles) + throws IOException { + SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); + if (readFileFooterFromCarbonDataFile) { + // this case will be used in case of upgrade where old store will not have the blocklet + // info in the index file and therefore blocklet info need to be read from the file footer + // in the carbondata file + fileStore.readAllIndexAndFillBolckletInfo(segmentPath); + } else { + fileStore.readAllIIndexOfSegment(segmentPath); + } + Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap(); + writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap); + for (CarbonFile indexFile : indexFiles) { + indexFile.delete(); + } + return null; + } + + private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> indexFileNamesTobeAdded, + SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException { + SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); + fileStore + .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), SegmentStatus.SUCCESS, + true); + Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath(); + Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>(); + for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) { + Path path = new Path(entry.getKey()); + Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString()); + if (map == null) { + map = new HashMap<>(); + indexLocationMap.put(path.getParent().toString(), map); } - if (fileNames.size() > 0) { - openThriftWriter( - segmentPath + "/" + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT); - indexHeader.setFile_names(fileNames); - mergedBlockIndex.setFileData(data); - writeMergedBlockIndexHeader(indexHeader); - writeMergedBlockIndex(mergedBlockIndex); - close(); + map.put(path.getName(), entry.getValue()); + } + for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) { + String mergeIndexFile = + writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue()); + for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : sfs.getLocationMap() + .entrySet()) { + String location = segentry.getKey(); + if (segentry.getValue().isRelative()) { + location = sfs.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (new Path(entry.getKey()).equals(new Path(location))) { + segentry.getValue().setMergeFileName(mergeIndexFile); + break; + } } - for (CarbonFile indexFile : indexFiles) { - indexFile.delete(); + } + + List<String> filesTobeDeleted = new ArrayList<>(); + for (CarbonFile file : indexFiles) { + filesTobeDeleted.add(file.getAbsolutePath()); + } + return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), filesTobeDeleted); + } + + private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath, + Map<String, byte[]> indexMap) throws IOException { + MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader(); + MergedBlockIndex mergedBlockIndex = new MergedBlockIndex(); + List<String> fileNames = new ArrayList<>(indexMap.size()); + List<ByteBuffer> data = new ArrayList<>(indexMap.size()); + for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) { + if (indexFileNamesTobeAdded == null || + indexFileNamesTobeAdded.contains(entry.getKey())) { + fileNames.add(entry.getKey()); + data.add(ByteBuffer.wrap(entry.getValue())); } } + if (fileNames.size() > 0) { + String mergeIndexName = System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT; + openThriftWriter(segmentPath + "/" + mergeIndexName); + indexHeader.setFile_names(fileNames); + mergedBlockIndex.setFileData(data); + writeMergedBlockIndexHeader(indexHeader); + writeMergedBlockIndex(mergedBlockIndex); + close(); + return mergeIndexName; + } + return null; } /** * Merge all the carbonindex files of segment to a merged file * - * @param segmentPath + * @param segmentId * @param indexFileNamesTobeAdded * @throws IOException */ - public void mergeCarbonIndexFilesOfSegment(String segmentPath, - List<String> indexFileNamesTobeAdded) throws IOException { - mergeCarbonIndexFilesOfSegment(segmentPath, indexFileNamesTobeAdded, false); + public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + String tablePath, List<String> indexFileNamesTobeAdded) throws IOException { + return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false); } /** * Merge all the carbonindex files of segment to a merged file - * @param segmentPath + * + * @param segmentId * @throws IOException */ - public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException { - mergeCarbonIndexFilesOfSegment(segmentPath, null, false); + public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + String tablePath) throws IOException { + return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false); } /** * Merge all the carbonindex files of segment to a merged file - * @param segmentPath + * + * @param segmentId * @param readFileFooterFromCarbonDataFile * @throws IOException */ - public void mergeCarbonIndexFilesOfSegment(String segmentPath, - boolean readFileFooterFromCarbonDataFile) throws IOException { - mergeCarbonIndexFilesOfSegment(segmentPath, null, readFileFooterFromCarbonDataFile); + public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException { + return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, + readFileFooterFromCarbonDataFile); } private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) { @@ -166,4 +249,24 @@ public class CarbonIndexFileMergeWriter { thriftWriter.close(); } + public static class SegmentIndexFIleMergeStatus implements Serializable { + + private SegmentFileStore.SegmentFile segmentFile; + + private List<String> filesTobeDeleted; + + public SegmentIndexFIleMergeStatus(SegmentFileStore.SegmentFile segmentFile, + List<String> filesTobeDeleted) { + this.segmentFile = segmentFile; + this.filesTobeDeleted = filesTobeDeleted; + } + + public SegmentFileStore.SegmentFile getSegmentFile() { + return segmentFile; + } + + public List<String> getFilesTobeDeleted() { + return filesTobeDeleted; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index 895b0b5..7608318 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -62,9 +62,8 @@ class CarbonIndexFileMergeTestCase sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " + s"'GLOBAL_SORT_PARTITIONS'='100')") val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge") - val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) assert(getIndexFileCount("default_indexmerge", "0") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), sql("""Select count(*) from indexmerge""")) @@ -88,9 +87,9 @@ class CarbonIndexFileMergeTestCase val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false) + .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) @@ -114,9 +113,9 @@ class CarbonIndexFileMergeTestCase val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false) + .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) @@ -144,7 +143,7 @@ class CarbonIndexFileMergeTestCase val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false) + .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) } @@ -174,7 +173,7 @@ class CarbonIndexFileMergeTestCase val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false) + .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) assert(getIndexFileCount("default_nonindexmerge", "2") == 100) http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala index ff062cd..b511ee8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala @@ -927,6 +927,92 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA assert(exMessage.getMessage.contains("day is not a valid partition column in table default.partitionnocolumn")) } + test("data loading with default partition in static partition table with batchsort") { + sql("DROP TABLE IF EXISTS partitiondefaultbatchsort") + sql( + """ + | CREATE TABLE partitiondefaultbatchsort (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectenddate Timestamp,attendance int, + | utilization int, doj Timestamp, empname String) + | PARTITIONED BY (projectjoindate Timestamp, salary decimal) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='BATCH_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultbatchsort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql("select count(*) from partitiondefaultbatchsort"), Seq(Row(10))) + } + + test("data loading with default partition in static partition table with nosort") { + sql("DROP TABLE IF EXISTS partitiondefaultnosort") + sql( + """ + | CREATE TABLE partitiondefaultnosort (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectenddate Timestamp,attendance int, + | utilization int, doj Timestamp, empname String) + | PARTITIONED BY (projectjoindate Timestamp, salary decimal) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='NO_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultnosort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql("select count(*) from partitiondefaultnosort"), Seq(Row(10))) + } + + test("data loading with default partition in static partition table with rename") { + sql("DROP TABLE IF EXISTS partitiondefaultrename") + sql("DROP TABLE IF EXISTS partitiondefaultrename_new") + sql( + """ + | CREATE TABLE partitiondefaultrename (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectenddate Timestamp,attendance int, + | utilization int, doj Timestamp, empname String) + | PARTITIONED BY (projectjoindate Timestamp, salary decimal) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql("select count(*) from partitiondefaultrename"), Seq(Row(10))) + sql(s"alter table partitiondefaultrename rename to partitiondefaultrename_new") + checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(10))) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(20))) + } + + test("data loading with default partition in static partition table with rename first") { + sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst") + sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst_new") + sql( + """ + | CREATE TABLE partitiondefaultrenamefirst (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectenddate Timestamp,attendance int, + | utilization int, doj Timestamp, empname String) + | PARTITIONED BY (projectjoindate Timestamp, salary decimal) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"alter table partitiondefaultrenamefirst rename to partitiondefaultrenamefirst_new") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrenamefirst_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql("select count(*) from partitiondefaultrenamefirst_new"), Seq(Row(10))) + } + + test("data loading for global partition table for two partition column with no columns in csv") { + sql("DROP TABLE IF EXISTS partitiontwonocolumns") + sql( + """ + | CREATE TABLE partitiontwonocolumns (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,doj Timestamp, empname String) + | PARTITIONED BY (newcol1 date, newcol2 int) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwonocolumns partition(newcol1='2016-08-09', newcol2='20') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwonocolumns order by empno"), + sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) + + checkAnswer(sql("select distinct cast(newcol1 as string) from partitiontwonocolumns"), Seq(Row("2016-08-09"))) + } + override def afterAll = { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index 58eb9f9..163e662 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -300,6 +300,27 @@ test("Creation of partition table should fail if the colname in table schema and FileFactory.deleteAllCarbonFilesOfDir(file) } + test("set partition location with static column partition with load command") { + sql("drop table if exists staticpartitionsetloc") + sql( + """ + | CREATE TABLE staticpartitionsetloc (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int,projectenddate Date,doj Timestamp) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val location = metastoredb +"/" +"ravi1" + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionsetloc partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + intercept[Exception] { + sql(s"""alter table staticpartitionsetloc partition (empname='ravi') set location '$location'""") + } + val file = FileFactory.getCarbonFile(location) + FileFactory.deleteAllCarbonFilesOfDir(file) + } + test("add external partition with static column partition with load command with diffrent schema") { sql( http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index c8f64e1..40b5cfc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.execution.command.schema +import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.util.AlterTableUtil @@ -119,6 +121,12 @@ private[sql] case class CarbonAlterTableRenameCommand( metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] .getClient() + var partitions: Seq[CatalogTablePartition] = Seq.empty + if (carbonTable.isHivePartitionTable) { + partitions = + sparkSession.sessionState.catalog.listPartitions( + TableIdentifier(oldTableName, Some(oldDatabaseName))) + } sparkSession.catalog.refreshTable(TableIdentifier(oldTableName, Some(oldDatabaseName)).quotedString) hiveClient.runSqlHive( @@ -127,6 +135,7 @@ private[sql] case class CarbonAlterTableRenameCommand( s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + s"('tableName'='$newTableName', " + s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") + // changed the rename order to deal with situation when carbon table and hive table // will point to the same tablePath if (FileFactory.isFileExist(tableMetadataFile, fileType)) { @@ -138,6 +147,27 @@ private[sql] case class CarbonAlterTableRenameCommand( sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") } } + val updatedParts = updatePartitionLocations( + partitions, + oldTablePath.getPath, + newTablePath, + sparkSession) + + val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName)) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier) + // Update the storage location with new path + sparkSession.sessionState.catalog.alterTable( + catalogTable.copy(storage = sparkSession.sessionState.catalog. + asInstanceOf[CarbonSessionCatalog].updateStorageLocation( + new Path(newTablePath), + catalogTable.storage))) + if (updatedParts.nonEmpty) { + // Update the new updated partitions specs with new location. + sparkSession.sessionState.catalog.alterPartitions( + newIdentifier, + updatedParts) + } + newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier, carbonTable.getCarbonTableIdentifier, tableInfo, @@ -151,8 +181,7 @@ private[sql] case class CarbonAlterTableRenameCommand( sparkSession) OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext) - sparkSession.catalog.refreshTable(TableIdentifier(newTableName, - Some(oldDatabaseName)).quotedString) + sparkSession.catalog.refreshTable(newIdentifier.quotedString) carbonTableLockFilePath = newTablePath LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") @@ -187,6 +216,31 @@ private[sql] case class CarbonAlterTableRenameCommand( Seq.empty } + /** + * Update partitions with new table location + * + */ + private def updatePartitionLocations( + partitions: Seq[CatalogTablePartition], + oldTablePath: String, + newTablePath: String, + sparkSession: SparkSession): Seq[CatalogTablePartition] = { + partitions.map{ part => + if (part.storage.locationUri.isDefined) { + val path = new Path(part.location) + if (path.toString.contains(oldTablePath)) { + val newPath = new Path(path.toString.replace(oldTablePath, newTablePath)) + part.copy(storage = sparkSession.sessionState.catalog. + asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, part.storage)) + } else { + part + } + } else { + part + } + } + } + private def renameBadRecords( oldTableName: String, newTableName: String, http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index f69ccc1..dcbce84 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -265,6 +265,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { RefreshCarbonTableCommand(tableIdentifier.database, tableIdentifier.table).run(sparkSession) ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil + case alterSetLoc@AlterTableSetLocationCommand(tableName, _, _) => + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(tableName)(sparkSession) + if (isCarbonTable) { + throw new UnsupportedOperationException("Set partition location is not supported") + } else { + ExecutedCommandExec(alterSetLoc) :: Nil + } case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 1b7f0cb..ba2fe947 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface @@ -182,6 +183,15 @@ class CarbonSessionCatalog( allPartitions } } + + /** + * Update the storageformat with new location information + */ + def updateStorageLocation( + path: Path, + storage: CatalogStorageFormat): CatalogStorageFormat = { + storage.copy(locationUri = Some(path.toString)) + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index a119bda..e82b485 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.generic.SeqFactory import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} @@ -171,6 +172,15 @@ class CarbonSessionCatalog( partitionFilters, sparkSession.sessionState.conf.sessionLocalTimeZone) } + + /** + * Update the storageformat with new location information + */ + def updateStorageLocation( + path: Path, + storage: CatalogStorageFormat): CatalogStorageFormat = { + storage.copy(locationUri = Some(path.toUri)) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index f5b29e7..82f4c9b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -57,15 +57,15 @@ public final class DataLoadProcessBuilder { CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); - if ((!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) - && !loadModel.isPartitionLoad()) { + if (loadModel.isPartitionLoad()) { + return buildInternalForPartitionLoad(inputIterators, configuration, sortScope); + } else if (!configuration.isSortTable() || + sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) { return buildInternalForNoSort(inputIterators, configuration); } else if (configuration.getBucketingInfo() != null) { return buildInternalForBucketing(inputIterators, configuration); } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) { return buildInternalForBatchSort(inputIterators, configuration); - } else if (loadModel.isPartitionLoad()) { - return buildInternalForPartitionLoad(inputIterators, configuration, sortScope); } else { return buildInternal(inputIterators, configuration); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 32c72da..23f9aa8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -48,6 +48,7 @@ import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; @@ -57,6 +58,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.merger.NodeBlockRelation; import org.apache.carbondata.processing.merger.NodeMultiBlockRelation; @@ -328,6 +330,60 @@ public final class CarbonLoaderUtil { return status; } + /** + * This API will update the segmentFile of a passed segment. + * + * @return boolean which determines whether status update is done or not. + * @throws IOException + */ + public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile) + throws IOException { + boolean status = false; + String tableStatusPath = CarbonTablePath.getTableStatusPath(tablePath); + String metadataPath = CarbonTablePath.getMetadataPath(tablePath); + AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier.from(tablePath, null, null); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + try { + if (carbonLock.lockWithRetries(retryCount, maxTimeout)) { + LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation"); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = + SegmentStatusManager.readLoadMetadata(metadataPath); + + for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) { + // if the segments is in the list of marked for delete then update the status. + if (segmentId.equals(detail.getLoadName())) { + detail.setSegmentFile(segmentFile); + break; + } + } + + SegmentStatusManager + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + status = true; + } else { + LOGGER.error( + "Not able to acquire the lock for Table status updation for table path " + tablePath); + } + ; + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + tablePath); + } else { + LOGGER.error( + "Unable to unlock Table lock for table" + tablePath + " during table status updation"); + } + } + return status; + } + private static void addToStaleFolders(CarbonTablePath carbonTablePath, List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException { String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName()); @@ -950,4 +1006,35 @@ public final class CarbonLoaderUtil { loadMetadataDetails.setIndexSize(String.valueOf(indexSize)); return dataSize + indexSize; } + + /** + * Merge index files with in the segment of partitioned table + * @param segmentId + * @param tablePath + * @param uniqueId + * @return + * @throws IOException + */ + public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath, + String uniqueId) throws IOException { + CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus = + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath); + if (segmentIndexFIleMergeStatus != null) { + uniqueId = System.currentTimeMillis() + ""; + String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT; + String path = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + newSegmentFileName; + SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path); + updateSegmentFile(tablePath, segmentId, newSegmentFileName); + deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted()); + } + return uniqueId; + } + + private static void deleteFiles(List<String> filesToBeDeleted) throws IOException { + for (String filePath : filesToBeDeleted) { + FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath)); + } + } }