[CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature
This closes #1984 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1997ca23 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1997ca23 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1997ca23 Branch: refs/heads/branch-1.3 Commit: 1997ca235f90b5746262c9654b685b9b6bd3f16a Parents: 758d03e Author: ravipesala <ravi.pes...@gmail.com> Authored: Thu Feb 15 00:31:56 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Tue Feb 27 13:19:35 2018 +0530 ---------------------------------------------------------------------- .../core/datamap/DataMapDistributable.java | 10 +- .../apache/carbondata/core/datamap/Segment.java | 97 +++ .../carbondata/core/datamap/TableDataMap.java | 45 +- .../carbondata/core/datamap/dev/DataMap.java | 3 +- .../core/datamap/dev/DataMapFactory.java | 9 +- .../filesystem/AbstractDFSCarbonFile.java | 27 +- .../indexstore/BlockletDataMapIndexStore.java | 107 +-- .../core/indexstore/BlockletDetailsFetcher.java | 10 +- .../core/indexstore/PartitionSpec.java | 92 +++ .../TableBlockIndexUniqueIdentifier.java | 87 +-- .../blockletindex/BlockletDataMap.java | 134 ++-- .../BlockletDataMapDistributable.java | 4 +- .../blockletindex/BlockletDataMapFactory.java | 104 ++- .../blockletindex/BlockletDataMapModel.java | 27 +- .../blockletindex/SegmentIndexFileStore.java | 94 ++- .../carbondata/core/locks/HdfsFileLock.java | 16 - .../carbondata/core/locks/LocalFileLock.java | 16 +- .../core/metadata/PartitionMapFileStore.java | 484 ------------ .../core/metadata/SegmentFileStore.java | 744 +++++++++++++++++++ .../core/mutate/CarbonUpdateUtil.java | 50 +- .../executor/impl/AbstractQueryExecutor.java | 15 +- .../core/statusmanager/LoadMetadataDetails.java | 18 + .../statusmanager/SegmentStatusManager.java | 87 ++- .../SegmentUpdateStatusManager.java | 41 +- .../apache/carbondata/core/util/CarbonUtil.java | 95 ++- .../core/util/DataTypeConverterImpl.java | 18 +- .../carbondata/core/util/DataTypeUtil.java | 34 +- .../core/util/path/CarbonTablePath.java | 33 +- .../carbondata/core/util/CarbonUtilTest.java | 9 +- .../carbondata/hadoop/CarbonInputSplit.java | 9 +- .../hadoop/api/CarbonOutputCommitter.java | 191 +++-- .../hadoop/api/CarbonTableInputFormat.java | 128 ++-- .../hadoop/api/CarbonTableOutputFormat.java | 2 + .../hadoop/api/DistributableDataMapFormat.java | 10 +- .../MajorCompactionIgnoreInMinorTest.scala | 6 +- .../MajorCompactionStopsAfterCompaction.scala | 2 +- .../testsuite/datamap/DataMapWriterSuite.scala | 10 +- .../TestInsertAndOtherCommandConcurrent.scala | 10 +- .../StandardPartitionBadRecordLoggerTest.scala | 17 - .../StandardPartitionGlobalSortTestCase.scala | 388 ++++++++-- .../StandardPartitionTableCleanTestCase.scala | 72 +- ...andardPartitionTableCompactionTestCase.scala | 36 +- .../StandardPartitionTableDropTestCase.scala | 1 + .../StandardPartitionTableLoadingTestCase.scala | 62 +- ...tandardPartitionTableOverwriteTestCase.scala | 76 +- .../StandardPartitionTableQueryTestCase.scala | 79 +- .../spark/util/SparkDataTypeConverterImpl.java | 12 + .../org/apache/carbondata/api/CarbonStore.scala | 12 +- .../spark/rdd/CarbonDropPartitionRDD.scala | 118 +-- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 52 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 3 +- .../spark/rdd/CarbonSparkPartition.scala | 3 +- .../carbondata/spark/rdd/PartitionDropper.scala | 2 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 207 ++++-- .../carbondata/spark/util/CommonUtil.scala | 27 +- .../carbondata/spark/util/DataLoadingUtil.scala | 88 ++- .../command/carbonTableSchemaCommon.scala | 17 +- .../spark/rdd/CarbonDataRDDFactory.scala | 49 +- .../spark/rdd/CarbonTableCompactor.scala | 72 +- .../org/apache/spark/sql/CarbonCountStar.scala | 4 +- .../sql/CarbonDatasourceHadoopRelation.scala | 3 +- .../CarbonAlterTableCompactionCommand.scala | 4 +- .../management/CarbonCleanFilesCommand.scala | 13 +- .../management/CarbonLoadDataCommand.scala | 432 +++++++---- .../management/RefreshCarbonTableCommand.scala | 27 +- .../CarbonProjectForUpdateCommand.scala | 3 +- .../command/mutation/DeleteExecution.scala | 20 +- .../command/mutation/HorizontalCompaction.scala | 5 +- ...arbonAlterTableAddHivePartitionCommand.scala | 117 +++ ...rbonAlterTableDropHivePartitionCommand.scala | 102 +-- .../CarbonAlterTableDropPartitionCommand.scala | 10 +- .../CarbonAlterTableSplitPartitionCommand.scala | 4 +- .../CreatePreAggregateTableCommand.scala | 7 +- .../preaaggregate/PreAggregateListeners.scala | 13 +- .../table/CarbonCreateTableCommand.scala | 13 + .../datasources/CarbonFileFormat.scala | 222 +++--- .../strategy/CarbonLateDecodeStrategy.scala | 15 +- .../sql/execution/strategy/DDLStrategy.scala | 15 +- .../apache/spark/sql/hive/CarbonRelation.scala | 18 +- .../spark/sql/optimizer/CarbonFilters.scala | 59 +- .../spark/sql/hive/CarbonSessionState.scala | 12 +- .../spark/sql/hive/CarbonSessionState.scala | 12 +- .../datamap/DataMapWriterListener.java | 3 +- .../loading/CarbonDataLoadConfiguration.java | 13 + .../loading/DataLoadProcessBuilder.java | 4 +- .../impl/MeasureFieldConverterImpl.java | 2 +- .../loading/model/CarbonLoadModel.java | 13 + .../processing/merger/CarbonDataMergerUtil.java | 77 +- .../merger/CompactionResultSortProcessor.java | 35 +- .../merger/RowResultMergerProcessor.java | 34 +- .../partition/spliter/RowResultProcessor.java | 5 +- .../store/CarbonFactDataHandlerModel.java | 10 +- .../util/CarbonDataProcessorUtil.java | 8 +- .../processing/util/CarbonLoaderUtil.java | 39 +- .../processing/util/DeleteLoadFolders.java | 80 +- .../carbon/datastore/BlockIndexStoreTest.java | 6 +- .../carbondata/streaming/StreamHandoffRDD.scala | 5 +- 97 files changed, 3741 insertions(+), 1994 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java index 50af789..edd724a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java @@ -31,7 +31,7 @@ public abstract class DataMapDistributable extends InputSplit private String tablePath; - private String segmentId; + private Segment segment; private String dataMapName; @@ -47,12 +47,12 @@ public abstract class DataMapDistributable extends InputSplit this.tablePath = tablePath; } - public String getSegmentId() { - return segmentId; + public Segment getSegment() { + return segment; } - public void setSegmentId(String segmentId) { - this.segmentId = segmentId; + public void setSegment(Segment segment) { + this.segment = segment; } public String getDataMapName() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java new file mode 100644 index 0000000..c47f16c --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents one load of carbondata + */ +public class Segment implements Serializable { + + private static final long serialVersionUID = 7044555408162234064L; + + private String segmentNo; + + private String segmentFileName; + + public Segment(String segmentNo, String segmentFileName) { + this.segmentNo = segmentNo; + this.segmentFileName = segmentFileName; + } + + public String getSegmentNo() { + return segmentNo; + } + + public String getSegmentFileName() { + return segmentFileName; + } + + public static List<Segment> toSegmentList(String[] segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.length); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + public static List<Segment> toSegmentList(List<String> segmentIds) { + List<Segment> list = new ArrayList<>(segmentIds.size()); + for (String segmentId : segmentIds) { + list.add(toSegment(segmentId)); + } + return list; + } + + /** + * SegmentId can be combination of segmentNo and segmentFileName + * @param segmentId + * @return + */ + public static Segment toSegment(String segmentId) { + String[] split = segmentId.split("#"); + if (split.length > 1) { + return new Segment(split[0], split[1]); + } else if (split.length > 0) { + return new Segment(split[0], null); + } + return new Segment(segmentId, null); + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Segment segment = (Segment) o; + return Objects.equals(segmentNo, segment.segmentNo); + } + + @Override public int hashCode() { + return Objects.hash(segmentNo); + } + + @Override public String toString() { + if (segmentFileName != null) { + return segmentNo + "#" + segmentFileName; + } else { + return segmentNo; + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 9c84891..6555d6c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.events.Event; @@ -60,21 +61,21 @@ public final class TableDataMap extends OperationEventListener { /** * Pass the valid segments and prune the datamap using filter expression * - * @param segmentIds + * @param segments * @param filterExp * @return */ - public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp, - List<String> partitions) throws IOException { + public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp, + List<PartitionSpec> partitions) throws IOException { List<ExtendedBlocklet> blocklets = new ArrayList<>(); - for (String segmentId : segmentIds) { + for (Segment segment : segments) { List<Blocklet> pruneBlocklets = new ArrayList<>(); - List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId); + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment); for (DataMap dataMap : dataMaps) { pruneBlocklets.addAll(dataMap.prune(filterExp, partitions)); } blocklets.addAll(addSegmentId(blockletDetailsFetcher - .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId)); + .getExtendedBlocklets(pruneBlocklets, segment), segment.getSegmentNo())); } return blocklets; } @@ -94,13 +95,13 @@ public final class TableDataMap extends OperationEventListener { * * @return */ - public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException { + public List<DataMapDistributable> toDistributable(List<Segment> segments) throws IOException { List<DataMapDistributable> distributables = new ArrayList<>(); - for (String segmentsId : segmentIds) { - List<DataMapDistributable> list = dataMapFactory.toDistributable(segmentsId); + for (Segment segment : segments) { + List<DataMapDistributable> list = dataMapFactory.toDistributable(segment); for (DataMapDistributable distributable: list) { distributable.setDataMapName(dataMapName); - distributable.setSegmentId(segmentsId); + distributable.setSegment(segment); distributable.setTablePath(identifier.getTablePath()); distributable.setDataMapFactoryClass(dataMapFactory.getClass().getName()); } @@ -118,7 +119,7 @@ public final class TableDataMap extends OperationEventListener { * @return */ public List<ExtendedBlocklet> prune(DataMapDistributable distributable, - FilterResolverIntf filterExp, List<String> partitions) throws IOException { + FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException { List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(); List<Blocklet> blocklets = new ArrayList<>(); List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable); @@ -127,8 +128,8 @@ public final class TableDataMap extends OperationEventListener { } for (Blocklet blocklet: blocklets) { ExtendedBlocklet detailedBlocklet = - blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId()); - detailedBlocklet.setSegmentId(distributable.getSegmentId()); + blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment()); + detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo()); detailedBlocklets.add(detailedBlocklet); } return detailedBlocklets; @@ -136,11 +137,11 @@ public final class TableDataMap extends OperationEventListener { /** * Clear only the datamaps of the segments - * @param segmentIds + * @param segments */ - public void clear(List<String> segmentIds) { - for (String segmentId: segmentIds) { - dataMapFactory.clear(segmentId); + public void clear(List<Segment> segments) { + for (Segment segment: segments) { + dataMapFactory.clear(segment); } } @@ -170,21 +171,21 @@ public final class TableDataMap extends OperationEventListener { /** * Method to prune the segments based on task min/max values * - * @param segmentIds + * @param segments * @param filterExp * @return * @throws IOException */ - public List<String> pruneSegments(List<String> segmentIds, FilterResolverIntf filterExp) + public List<String> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp) throws IOException { List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - for (String segmentId : segmentIds) { - List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId); + for (Segment segment : segments) { + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment); for (DataMap dataMap : dataMaps) { if (dataMap.isScanRequired(filterExp)) { // If any one task in a given segment contains the data that means the segment need to // be scanned and we need to validate further data maps in the same segment - prunedSegments.add(segmentId); + prunedSegments.add(segment.getSegmentNo()); break; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index 16be1ac..f3642d6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; @@ -50,7 +51,7 @@ public interface DataMap { * @param filterExp * @return */ - List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions); + List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions); // TODO Move this method to Abstract class /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index f5a7404..40cd436 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.events.Event; @@ -37,12 +38,12 @@ public interface DataMapFactory { /** * Return a new write for this datamap */ - DataMapWriter createWriter(String segmentId); + DataMapWriter createWriter(Segment segment); /** * Get the datamap for segmentid */ - List<DataMap> getDataMaps(String segmentId) throws IOException; + List<DataMap> getDataMaps(Segment segment) throws IOException; /** * Get datamaps for distributable object. @@ -53,7 +54,7 @@ public interface DataMapFactory { * Get all distributable objects of a segmentid * @return */ - List<DataMapDistributable> toDistributable(String segmentId); + List<DataMapDistributable> toDistributable(Segment segment); /** * @@ -64,7 +65,7 @@ public interface DataMapFactory { /** * Clears datamap of the segment */ - void clear(String segmentId); + void clear(Segment segment); /** * Clear all datamaps from memory http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 927cef5..68eaa21 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; @@ -412,15 +413,21 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { @Override public boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean doAs, - final FsPermission permission) throws IOException { + FsPermission permission) throws IOException { filePath = filePath.replace("\\", "/"); Path path = new Path(filePath); FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); - boolean result = fs.createNewFile(path); - if (null != permission) { - fs.setPermission(path, permission); + if (fs.exists(path)) { + return false; + } else { + if (permission == null) { + permission = FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(fs.getConf())); + } + // Pass the permissions duringg file creation itself + fs.create(path, permission, false, fs.getConf().getInt("io.file.buffer.size", 4096), + fs.getDefaultReplication(path), fs.getDefaultBlockSize(path), null).close(); + return true; } - return result; } @Override public boolean deleteFile(String filePath, FileFactory.FileType fileType) @@ -453,11 +460,15 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { filePath = filePath.replace("\\", "/"); Path path = new Path(filePath); FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); - if (fs.createNewFile(path)) { - fs.deleteOnExit(path); + if (fs.exists(path)) { + return false; + } else { + // Pass the permissions duringg file creation itself + fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false, + fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path), + fs.getDefaultBlockSize(path), null).close(); return true; } - return false; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 111a7a2..c6073d5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -28,14 +28,20 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.PartitionMapFileStore; -import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.util.DataFileFooterConverter; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; /** * Class to handle loading, unloading,clearing,storing of the table @@ -75,22 +81,10 @@ public class BlockletDataMapIndexStore BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey); if (dataMap == null) { try { - String segmentPath = CarbonTablePath.getSegmentPath( - identifier.getAbsoluteTableIdentifier().getTablePath(), - identifier.getSegmentId()); - Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); - CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(); SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); - indexFileStore.readAllIIndexOfSegment(carbonFiles); - PartitionMapFileStore partitionFileStore = new PartitionMapFileStore(); - partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath); - for (CarbonFile file : carbonFiles) { - blockMetaInfoMap - .put(file.getAbsolutePath(), new BlockMetaInfo(file.getLocations(), file.getSize())); - } - dataMap = - loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap); + Map<String, BlockMetaInfo> blockMetaInfoMap = + getBlockMetaInfoMap(identifier, indexFileStore); + dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap); } catch (MemoryException e) { LOGGER.error("memory exception when loading datamap: " + e.getMessage()); throw new RuntimeException(e.getMessage(), e); @@ -99,6 +93,47 @@ public class BlockletDataMapIndexStore return dataMap; } + private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier, + SegmentIndexFileStore indexFileStore) throws IOException { + if (identifier.getMergeIndexFileName() != null) { + CarbonFile indexMergeFile = FileFactory.getCarbonFile( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getMergeIndexFileName()); + if (indexMergeFile.exists()) { + indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); + } + } + if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { + indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName()) }); + } + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); + List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName())); + for (DataFileFooter footer : indexInfo) { + String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); + blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath)); + } + return blockMetaInfoMap; + } + + private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException { + CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); + if (carbonFile instanceof AbstractDFSCarbonFile) { + RemoteIterator<LocatedFileStatus> iter = + ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile)); + LocatedFileStatus fileStatus = iter.next(); + String[] location = fileStatus.getBlockLocations()[0].getHosts(); + long len = fileStatus.getLen(); + return new BlockMetaInfo(location, len); + } else { + return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize()); + } + } + @Override public List<BlockletDataMap> getAll( List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException { @@ -116,34 +151,13 @@ public class BlockletDataMapIndexStore } } if (missedIdentifiers.size() > 0) { - Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new HashMap<>(); - Map<String, PartitionMapFileStore> partitionFileStoreMap = new HashMap<>(); - Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) { - SegmentIndexFileStore indexFileStore = - segmentIndexFileStoreMap.get(identifier.getSegmentId()); - PartitionMapFileStore partitionFileStore = - partitionFileStoreMap.get(identifier.getSegmentId()); - String segmentPath = CarbonTablePath.getSegmentPath( - identifier.getAbsoluteTableIdentifier().getTablePath(), - identifier.getSegmentId()); - if (indexFileStore == null) { - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); - CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(); - indexFileStore = new SegmentIndexFileStore(); - indexFileStore.readAllIIndexOfSegment(carbonFiles); - segmentIndexFileStoreMap.put(identifier.getSegmentId(), indexFileStore); - partitionFileStore = new PartitionMapFileStore(); - partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath); - partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore); - for (CarbonFile file : carbonFiles) { - blockMetaInfoMap.put(FileFactory.getUpdatedFilePath(file.getAbsolutePath()), - new BlockMetaInfo(file.getLocations(), file.getSize())); - } - } + Map<String, BlockMetaInfo> blockMetaInfoMap = + getBlockMetaInfoMap(identifier, indexFileStore); blockletDataMaps.add( - loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap)); + loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap)); } } } catch (Throwable e) { @@ -194,7 +208,6 @@ public class BlockletDataMapIndexStore private BlockletDataMap loadAndGetDataMap( TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore, - PartitionMapFileStore partitionFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap) throws IOException, MemoryException { String uniqueTableSegmentIdentifier = @@ -206,10 +219,10 @@ public class BlockletDataMapIndexStore BlockletDataMap dataMap; synchronized (lock) { dataMap = new BlockletDataMap(); - dataMap.init(new BlockletDataMapModel(identifier.getFilePath(), - indexFileStore.getFileData(identifier.getCarbonIndexFileName()), - partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()), - partitionFileStore.isPartionedSegment(), blockMetaInfoMap)); + dataMap.init(new BlockletDataMapModel( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()), + blockMetaInfoMap, identifier.getSegmentId())); lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap, dataMap.getMemorySize()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java index 21ecba1..b4d6db2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; import java.util.List; +import org.apache.carbondata.core.datamap.Segment; + /** * Fetches the detailed blocklet which has more information to execute the query */ @@ -28,20 +30,20 @@ public interface BlockletDetailsFetcher { * Get the blocklet detail information based on blockletid, blockid and segmentid. * * @param blocklets - * @param segmentId + * @param segment * @return * @throws IOException */ - List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId) + List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment) throws IOException; /** * Get the blocklet detail information based on blockletid, blockid and segmentid. * * @param blocklet - * @param segmentId + * @param segment * @return * @throws IOException */ - ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException; + ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java new file mode 100644 index 0000000..87c875e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +import java.io.Serializable; +import java.net.URI; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.core.datastore.impl.FileFactory; + +import org.apache.hadoop.fs.Path; + +/** + * Holds partition information. + */ +public class PartitionSpec implements Serializable { + + private static final long serialVersionUID = 4828007433384867678L; + + /** + * It holds the partition information in columnName=partitionValue combination. + */ + private List<String> partitions; + + private transient Path locationPath; + + private String location; + + private String uuid; + + public PartitionSpec(List<String> partitions, String location) { + this.partitions = partitions; + this.locationPath = new Path(FileFactory.getUpdatedFilePath(location)); + this.location = locationPath.toString(); + } + + public PartitionSpec(List<String> partitions, URI location) { + this.partitions = partitions; + this.locationPath = new Path(FileFactory.getUpdatedFilePath(new Path(location).toString())); + this.location = locationPath.toString(); + } + + public List<String> getPartitions() { + return partitions; + } + + public Path getLocation() { + if (locationPath == null) { + locationPath = new Path(location); + } + return locationPath; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PartitionSpec spec = (PartitionSpec) o; + return Objects.equals(getLocation(), spec.getLocation()); + } + + @Override public int hashCode() { + return Objects.hash(locationPath); + } + + @Override public String toString() { + return "PartitionSpec{" + "partitions=" + partitions + ", locationPath=" + locationPath + + ", location='" + location + '\'' + '}'; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java index 18357ac..c907fa8 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java @@ -17,91 +17,66 @@ package org.apache.carbondata.core.indexstore; +import java.util.Objects; + import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; /** - * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment + * Class holds the indexFile information to uniquely identitify the carbon index */ public class TableBlockIndexUniqueIdentifier { - /** - * table fully qualified identifier - */ - private AbsoluteTableIdentifier absoluteTableIdentifier; - private String segmentId; + private String indexFilePath; - private String carbonIndexFileName; + private String indexFileName; - /** - * Constructor to initialize the class instance - * - * @param absoluteTableIdentifier - * @param segmentId - */ - public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier, - String segmentId, String carbonIndexFileName) { - this.absoluteTableIdentifier = absoluteTableIdentifier; + private String mergeIndexFileName; + + private String segmentId; + + public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileName, + String mergeIndexFileName, String segmentId) { + this.indexFilePath = indexFilePath; + this.indexFileName = indexFileName; + this.mergeIndexFileName = mergeIndexFileName; this.segmentId = segmentId; - this.carbonIndexFileName = carbonIndexFileName; } /** - * returns AbsoluteTableIdentifier + * method returns the id to uniquely identify a key * * @return */ - public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { - return absoluteTableIdentifier; + public String getUniqueTableSegmentIdentifier() { + return indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName; } - public String getSegmentId() { - return segmentId; + public String getIndexFilePath() { + return indexFilePath; } - /** - * method returns the id to uniquely identify a key - * - * @return - */ - public String getUniqueTableSegmentIdentifier() { - CarbonTableIdentifier carbonTableIdentifier = - absoluteTableIdentifier.getCarbonTableIdentifier(); - return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR - + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE - + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId - + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName; + public String getIndexFileName() { + return indexFileName; + } + + public String getMergeIndexFileName() { + return mergeIndexFileName; } - public String getFilePath() { - return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/" - + carbonIndexFileName; + public String getSegmentId() { + return segmentId; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o; - - if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) { - return false; - } - if (!segmentId.equals(that.segmentId)) { - return false; - } - return carbonIndexFileName.equals(that.carbonIndexFileName); + return Objects.equals(indexFilePath, that.indexFilePath) && Objects + .equals(indexFileName, that.indexFileName) && Objects + .equals(mergeIndexFileName, that.mergeIndexFileName); } @Override public int hashCode() { - int result = absoluteTableIdentifier.hashCode(); - result = 31 * result + segmentId.hashCode(); - result = 31 * result + carbonIndexFileName.hashCode(); - return result; - } - - public String getCarbonIndexFileName() { - return carbonIndexFileName; + return Objects.hash(indexFilePath, indexFileName, mergeIndexFileName); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 699f9e1..9ec7a46 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -43,6 +43,7 @@ import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; @@ -65,8 +66,10 @@ import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; import org.xerial.snappy.Snappy; /** @@ -111,7 +114,11 @@ public class BlockletDataMap implements DataMap, Cacheable { private static int SCHEMA = 2; - private static int PARTITION_INFO = 3; + private static int INDEX_PATH = 3; + + private static int INDEX_FILE_NAME = 4; + + private static int SEGMENTID = 5; private UnsafeMemoryDMStore unsafeMemoryDMStore; @@ -121,8 +128,6 @@ public class BlockletDataMap implements DataMap, Cacheable { private int[] columnCardinality; - private boolean isPartitionedSegment; - @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException { long startTime = System.currentTimeMillis(); @@ -131,7 +136,11 @@ public class BlockletDataMap implements DataMap, Cacheable { DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); List<DataFileFooter> indexInfo = fileFooterConverter .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData()); - isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment(); + Path path = new Path(blockletDataMapInfo.getFilePath()); + byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] segmentId = + blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); DataMapRowImpl summaryRow = null; byte[] schemaBinary = null; // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID @@ -145,7 +154,8 @@ public class BlockletDataMap implements DataMap, Cacheable { columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); segmentProperties = new SegmentProperties(columnInTable, columnCardinality); createSchema(segmentProperties); - createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary); + createSummarySchema(segmentProperties, schemaBinary, filePath, fileName, + segmentId); } TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); BlockMetaInfo blockMetaInfo = @@ -182,8 +192,10 @@ public class BlockletDataMap implements DataMap, Cacheable { if (null != unsafeMemorySummaryDMStore) { addTaskSummaryRowToUnsafeMemoryStore( summaryRow, - blockletDataMapInfo.getPartitions(), - schemaBinary); + schemaBinary, + filePath, + fileName, + segmentId); unsafeMemorySummaryDMStore.finishWriting(); } LOGGER.info( @@ -354,8 +366,8 @@ public class BlockletDataMap implements DataMap, Cacheable { return summaryRow; } - private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, - List<String> partitions, byte[] schemaBinary) throws IOException { + private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary, + byte[] filePath, byte[] fileName, byte[] segmentId) { // write the task summary info to unsafe memory store if (null != summaryRow) { // Add column schema , it is useful to generate segment properties in executor. @@ -363,18 +375,9 @@ public class BlockletDataMap implements DataMap, Cacheable { if (schemaBinary != null) { summaryRow.setByteArray(schemaBinary, SCHEMA); } - if (partitions != null && partitions.size() > 0) { - CarbonRowSchema[] minSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore - .getSchema()[PARTITION_INFO]).getChildSchemas(); - DataMapRow partitionRow = new DataMapRowImpl(minSchemas); - for (int i = 0; i < partitions.size(); i++) { - partitionRow - .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS), - i); - } - summaryRow.setRow(partitionRow, PARTITION_INFO); - } + summaryRow.setByteArray(filePath, INDEX_PATH); + summaryRow.setByteArray(fileName, INDEX_FILE_NAME); + summaryRow.setByteArray(segmentId, SEGMENTID); try { unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow); } catch (Exception e) { @@ -560,27 +563,25 @@ public class BlockletDataMap implements DataMap, Cacheable { * once per datamap. It stores datamap level max/min of each column and partition information of * datamap * @param segmentProperties - * @param partitions * @throws MemoryException */ - private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions, - byte[] schemaBinary) + private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary, + byte[] filePath, byte[] fileName, byte[] segmentId) throws MemoryException { List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(); getMinMaxSchema(segmentProperties, taskMinMaxSchemas); // for storing column schema taskMinMaxSchemas.add( new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length)); - if (partitions != null && partitions.size() > 0) { - CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()]; - for (int i = 0; i < mapSchemas.length; i++) { - mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); - } - CarbonRowSchema mapSchema = - new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), - mapSchemas); - taskMinMaxSchemas.add(mapSchema); - } + // for storing file path + taskMinMaxSchemas.add( + new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length)); + // for storing file name + taskMinMaxSchemas.add( + new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length)); + // for storing segmentid + taskMinMaxSchemas.add( + new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length)); unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore( taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()])); } @@ -660,22 +661,24 @@ public class BlockletDataMap implements DataMap, Cacheable { return blocklets; } - @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) { + @Override + public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } - // First get the partitions which are stored inside datamap. - List<String> storedPartitions = getPartitions(); // if it has partitioned datamap but there is no partitioned information stored, it means // partitions are dropped so return empty list. - if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) { - return new ArrayList<>(); - } - if (storedPartitions != null && storedPartitions.size() > 0) { + if (partitions != null) { + // First get the partitions which are stored inside datamap. + String[] fileDetails = getFileDetails(); // Check the exact match of partition information inside the stored partitions. boolean found = false; - if (partitions != null && partitions.size() > 0) { - found = partitions.containsAll(storedPartitions); + Path folderPath = new Path(fileDetails[0]); + for (PartitionSpec spec : partitions) { + if (folderPath.equals(spec.getLocation()) && isCorrectUUID(fileDetails, spec)) { + found = true; + break; + } } if (!found) { return new ArrayList<>(); @@ -685,6 +688,20 @@ public class BlockletDataMap implements DataMap, Cacheable { return prune(filterExp); } + private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) { + boolean needToScan = false; + if (spec.getUuid() != null) { + String[] split = spec.getUuid().split("_"); + if (split[0].equals(fileDetails[2]) && CarbonTablePath.DataFileUtil + .getTimeStampFromFileName(fileDetails[1]).equals(split[1])) { + needToScan = true; + } + } else { + needToScan = true; + } + return needToScan; + } + /** * select the blocks based on column min and max value * @@ -767,6 +784,23 @@ public class BlockletDataMap implements DataMap, Cacheable { return blocklet; } + private String[] getFileDetails() { + try { + String[] fileDetails = new String[3]; + DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + fileDetails[0] = + new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET); + fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME), + CarbonCommonConstants.DEFAULT_CHARSET); + fileDetails[2] = new String(unsafeRow.getByteArray(SEGMENTID), + CarbonCommonConstants.DEFAULT_CHARSET); + return fileDetails; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** * Binary search used to get the first tentative index row based on * search key @@ -874,20 +908,6 @@ public class BlockletDataMap implements DataMap, Cacheable { return dataMapRow; } - private List<String> getPartitions() { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); - if (unsafeRow.getColumnCount() > PARTITION_INFO) { - List<String> partitions = new ArrayList<>(); - DataMapRow row = unsafeRow.getRow(PARTITION_INFO); - for (int i = 0; i < row.getColumnCount(); i++) { - partitions.add( - new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); - } - return partitions; - } - return null; - } - private byte[] getColumnSchemaBinary() { DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); return unsafeRow.getByteArray(SCHEMA); http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java index 63f45b5..99e48a5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java @@ -31,8 +31,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable { */ private String filePath; - public BlockletDataMapDistributable(String indexFileName) { - this.filePath = indexFileName; + public BlockletDataMapDistributable(String indexFilePath) { + this.filePath = indexFilePath; } public String getFilePath() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 2e2cab5..5eb077f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; @@ -37,6 +38,7 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -65,30 +67,40 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe } @Override - public DataMapWriter createWriter(String segmentId) { + public DataMapWriter createWriter(Segment segment) { throw new UnsupportedOperationException("not implemented"); } @Override - public List<DataMap> getDataMaps(String segmentId) throws IOException { + public List<DataMap> getDataMaps(Segment segment) throws IOException { List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - getTableBlockIndexUniqueIdentifiers(segmentId); + getTableBlockIndexUniqueIdentifiers(segment); return cache.getAll(tableBlockIndexUniqueIdentifiers); } private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers( - String segmentId) throws IOException { + Segment segment) throws IOException { List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - segmentMap.get(segmentId); + segmentMap.get(segment.getSegmentNo()); if (tableBlockIndexUniqueIdentifiers == null) { tableBlockIndexUniqueIdentifiers = new ArrayList<>(); - String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); - List<String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); - for (int i = 0; i < indexFiles.size(); i++) { + Map<String, String> indexFiles; + if (segment.getSegmentFileName() == null) { + String path = + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); + indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); + } else { + SegmentFileStore fileStore = + new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); + indexFiles = fileStore.getIndexFiles(); + } + for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); tableBlockIndexUniqueIdentifiers.add( - new TableBlockIndexUniqueIdentifier(identifier, segmentId, indexFiles.get(i))); + new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), + indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo())); } - segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers); + segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers); } return tableBlockIndexUniqueIdentifiers; } @@ -99,7 +111,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe * datamap. */ @Override - public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId) + public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment) throws IOException { List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(); // If it is already detailed blocklet then type cast and return same @@ -110,7 +122,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe return detailedBlocklets; } List<TableBlockIndexUniqueIdentifier> identifiers = - getTableBlockIndexUniqueIdentifiers(segmentId); + getTableBlockIndexUniqueIdentifiers(segment); // Retrieve each blocklets detail information from blocklet datamap for (Blocklet blocklet : blocklets) { detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet)); @@ -119,13 +131,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe } @Override - public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) + public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException { if (blocklet instanceof ExtendedBlocklet) { return (ExtendedBlocklet) blocklet; } List<TableBlockIndexUniqueIdentifier> identifiers = - getTableBlockIndexUniqueIdentifiers(segmentId); + getTableBlockIndexUniqueIdentifiers(segment); return getExtendedBlocklet(identifiers, blocklet); } @@ -133,7 +145,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe Blocklet blocklet) throws IOException { String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath()); for (TableBlockIndexUniqueIdentifier identifier : identifiers) { - if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) { + if (identifier.getIndexFilePath().equals(carbonIndexFileName)) { DataMap dataMap = cache.get(identifier); return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId()); } @@ -141,36 +153,51 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found "); } - @Override - public List<DataMapDistributable> toDistributable(String segmentId) { - CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId); + public List<DataMapDistributable> toDistributable(Segment segment) { List<DataMapDistributable> distributables = new ArrayList<>(); - for (int i = 0; i < carbonIndexFiles.length; i++) { - Path path = new Path(carbonIndexFiles[i].getPath()); - try { + try { + CarbonFile[] carbonIndexFiles; + if (segment.getSegmentFileName() == null) { + carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles( + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())); + } else { + SegmentFileStore fileStore = + new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); + Map<String, String> indexFiles = fileStore.getIndexFiles(); + carbonIndexFiles = new CarbonFile[indexFiles.size()]; + int i = 0; + for (String indexFile : indexFiles.keySet()) { + carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile); + } + } + for (int i = 0; i < carbonIndexFiles.length; i++) { + Path path = new Path(carbonIndexFiles[i].getPath()); + FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); LocatedFileStatus fileStatus = iter.next(); String[] location = fileStatus.getBlockLocations()[0].getHosts(); BlockletDataMapDistributable distributable = - new BlockletDataMapDistributable(path.getName()); + new BlockletDataMapDistributable(path.toString()); distributable.setLocations(location); distributables.add(distributable); - } catch (IOException e) { - throw new RuntimeException(e); + } + } catch (IOException e) { + throw new RuntimeException(e); } return distributables; } - @Override public void fireEvent(Event event) { + @Override + public void fireEvent(Event event) { } @Override - public void clear(String segmentId) { - List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId); + public void clear(Segment segment) { + List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo()); if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { DataMap dataMap = cache.getIfPresent(blockIndex); @@ -185,7 +212,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe @Override public void clear() { for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { - clear(segmentId); + clear(new Segment(segmentId, null)); } } @@ -193,18 +220,21 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException { BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable; List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>(); - if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { - identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), - mapDistributable.getFilePath())); - } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + Path indexPath = new Path(mapDistributable.getFilePath()); + String segmentNo = mapDistributable.getSegment().getSegmentNo(); + if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + String parent = indexPath.getParent().toString(); + identifiers + .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo)); + } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - List<String> indexFiles = fileStore.getIndexFilesFromMergeFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath(), mapDistributable.getSegmentId()) - + "/" + mapDistributable.getFilePath()); + CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString()); + String parentPath = carbonFile.getParentFile().getAbsolutePath(); + List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath()); for (String indexFile : indexFiles) { identifiers.add( - new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), - indexFile)); + new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(), + segmentNo)); } } List<DataMap> dataMaps; http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index b3a7f8c..ebeb278 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.core.indexstore.blockletindex; -import java.util.List; import java.util.Map; import org.apache.carbondata.core.datamap.dev.DataMapModel; @@ -29,35 +28,27 @@ public class BlockletDataMapModel extends DataMapModel { private byte[] fileData; - private List<String> partitions; + private Map<String, BlockMetaInfo> blockMetaInfoMap; - private boolean partitionedSegment; + private String segmentId; - Map<String, BlockMetaInfo> blockMetaInfoMap; - - public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions, - boolean partitionedSegment, - Map<String, BlockMetaInfo> blockMetaInfoMap) { + public BlockletDataMapModel(String filePath, byte[] fileData, + Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) { super(filePath); this.fileData = fileData; - this.partitions = partitions; - this.partitionedSegment = partitionedSegment; this.blockMetaInfoMap = blockMetaInfoMap; + this.segmentId = segmentId; } public byte[] getFileData() { return fileData; } - public List<String> getPartitions() { - return partitions; - } - - public boolean isPartitionedSegment() { - return partitionedSegment; - } - public Map<String, BlockMetaInfo> getBlockMetaInfoMap() { return blockMetaInfoMap; } + + public String getSegmentId() { + return segmentId; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index a30b04c..b88c1f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -28,10 +28,12 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.reader.CarbonIndexFileReader; import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; @@ -58,8 +60,14 @@ public class SegmentIndexFileStore { */ private Map<String, byte[]> carbonIndexMap; + /** + * Stores the indexfile name and related binary file data in it. + */ + private Map<String, byte[]> carbonIndexMapWithFullPath; + public SegmentIndexFileStore() { carbonIndexMap = new HashMap<>(); + carbonIndexMapWithFullPath = new HashMap<>(); } /** @@ -80,6 +88,45 @@ public class SegmentIndexFileStore { } /** + * Read all index files and keep the cache in it. + * + * @param segmentFileStore + * @throws IOException + */ + public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status, + boolean ignoreStatus) throws IOException { + List<CarbonFile> carbonIndexFiles = new ArrayList<>(); + if (segmentFileStore.getLocationMap() == null) { + return; + } + for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore + .getLocationMap().entrySet()) { + String location = locations.getKey(); + + if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) { + if (locations.getValue().isRelative()) { + location = + segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location; + } + for (String indexFile : locations.getValue().getFiles()) { + CarbonFile carbonFile = FileFactory + .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile); + if (carbonFile.exists()) { + carbonIndexFiles.add(carbonFile); + } + } + } + } + for (int i = 0; i < carbonIndexFiles.size(); i++) { + if (carbonIndexFiles.get(i).getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + readMergeFile(carbonIndexFiles.get(i).getCanonicalPath()); + } else if (carbonIndexFiles.get(i).getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + readIndexFile(carbonIndexFiles.get(i)); + } + } + } + + /** * read index file and fill the blocklet information * * @param segmentPath @@ -120,17 +167,22 @@ public class SegmentIndexFileStore { * @return * @throws IOException */ - public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException { + public Map<String, String> getIndexFilesFromSegment(String segmentPath) throws IOException { CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); - Set<String> indexFiles = new HashSet<>(); + Map<String, String> indexFiles = new HashMap<>(); for (int i = 0; i < carbonIndexFiles.length; i++) { if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { - indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath())); + List<String> indexFilesFromMergeFile = + getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()); + for (String file: indexFilesFromMergeFile) { + indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath() + + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName()); + } } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { - indexFiles.add(carbonIndexFiles[i].getName()); + indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null); } } - return new ArrayList<>(indexFiles); + return indexFiles; } /** @@ -156,16 +208,23 @@ public class SegmentIndexFileStore { */ private void readMergeFile(String mergeFilePath) throws IOException { ThriftReader thriftReader = new ThriftReader(mergeFilePath); - thriftReader.open(); - MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); - MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); - List<String> file_names = indexHeader.getFile_names(); - List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); - assert (file_names.size() == fileData.size()); - for (int i = 0; i < file_names.size(); i++) { - carbonIndexMap.put(file_names.get(i), fileData.get(i).array()); + try { + thriftReader.open(); + MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); + MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); + List<String> file_names = indexHeader.getFile_names(); + List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); + CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath); + assert (file_names.size() == fileData.size()); + for (int i = 0; i < file_names.size(); i++) { + carbonIndexMap.put(file_names.get(i), fileData.get(i).array()); + carbonIndexMapWithFullPath.put( + mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR + + file_names.get(i), fileData.get(i).array()); + } + } finally { + thriftReader.close(); } - thriftReader.close(); } /** @@ -181,6 +240,9 @@ public class SegmentIndexFileStore { byte[] bytes = new byte[(int) indexFile.getSize()]; dataInputStream.readFully(bytes); carbonIndexMap.put(indexFile.getName(), bytes); + carbonIndexMapWithFullPath.put( + indexFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR + + indexFile.getName(), bytes); dataInputStream.close(); } @@ -253,6 +315,10 @@ public class SegmentIndexFileStore { return carbonIndexMap; } + public Map<String, byte[]> getCarbonIndexMapWithFullPath() { + return carbonIndexMapWithFullPath; + } + /** * This method will read the index information from carbon index file * http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java index cc98b03..be98f7d 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -102,21 +101,6 @@ public class HdfsFileLock extends AbstractCarbonLock { status = true; } catch (IOException e) { status = false; - } finally { - CarbonFile carbonFile = - FileFactory.getCarbonFile(location, FileFactory.getFileType(location)); - if (carbonFile.exists()) { - if (carbonFile.delete()) { - LOGGER.info("Deleted the lock file " + location); - } else { - LOGGER.error("Not able to delete the lock file " + location); - status = false; - } - } else { - LOGGER.error("Not able to delete the lock file because " - + "it is not existed in location " + location); - status = false; - } } } return status; http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java index 4fee4c4..75ea074 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java @@ -26,7 +26,6 @@ import java.nio.channels.OverlappingFileLockException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -121,7 +120,6 @@ public class LocalFileLock extends AbstractCarbonLock { LOGGER.info(e.getMessage()); return false; } - } /** @@ -130,27 +128,18 @@ public class LocalFileLock extends AbstractCarbonLock { * @return */ @Override public boolean unlock() { - boolean status; + boolean status = false; try { if (null != fileLock) { fileLock.release(); + status = true; } - status = true; } catch (IOException e) { status = false; } finally { if (null != fileOutputStream) { try { fileOutputStream.close(); - // deleting the lock file after releasing the lock. - CarbonFile lockFile = FileFactory - .getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath)); - if (!lockFile.exists() || lockFile.delete()) { - LOGGER.info("Successfully deleted the lock file " + lockFilePath); - } else { - LOGGER.error("Not able to delete the lock file " + lockFilePath); - status = false; - } } catch (IOException e) { LOGGER.error(e.getMessage()); } @@ -158,5 +147,4 @@ public class LocalFileLock extends AbstractCarbonLock { } return status; } - }