[CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper
This PR has two JIRA fixes [CARBONDATA-2310] Refactored code to improve Distributable interface [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper This closes #2244 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/531ecdf3 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/531ecdf3 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/531ecdf3 Branch: refs/heads/master Commit: 531ecdf3f40c064d4ff6ad36c43fa90a2d423588 Parents: a7926ea Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Fri Apr 27 23:03:52 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon May 7 13:11:29 2018 +0530 ---------------------------------------------------------------------- .../org/apache/carbondata/core/cache/Cache.java | 10 + .../dictionary/AbstractDictionaryCache.java | 6 + .../core/constants/CarbonCommonConstants.java | 3 + .../core/datamap/dev/CacheableDataMap.java | 51 ++++ .../core/datastore/SegmentTaskIndexStore.java | 7 + .../filesystem/AbstractDFSCarbonFile.java | 5 +- .../core/datastore/filesystem/CarbonFile.java | 3 +- .../datastore/filesystem/LocalCarbonFile.java | 3 +- .../core/indexstore/AbstractMemoryDMStore.java | 63 +++++ .../indexstore/BlockletDataMapIndexStore.java | 187 ++++++++------- .../indexstore/BlockletDataMapIndexWrapper.java | 52 +++++ .../core/indexstore/BlockletDetailInfo.java | 66 ++++-- .../core/indexstore/SafeMemoryDMStore.java | 105 +++++++++ .../TableBlockIndexUniqueIdentifier.java | 5 +- .../core/indexstore/UnsafeMemoryDMStore.java | 25 +- .../blockletindex/BlockletDataMap.java | 232 ++++++++++++------- .../BlockletDataMapDistributable.java | 12 + .../blockletindex/BlockletDataMapFactory.java | 127 ++++++---- .../blockletindex/BlockletDataMapModel.java | 12 + .../blockletindex/SegmentIndexFileStore.java | 39 +++- .../core/indexstore/row/DataMapRow.java | 13 +- .../core/indexstore/row/UnsafeDataMapRow.java | 7 +- .../core/indexstore/schema/CarbonRowSchema.java | 10 +- .../core/metadata/SegmentFileStore.java | 29 +++ .../core/metadata/schema/table/TableInfo.java | 24 ++ .../TableStatusReadCommittedScope.java | 4 +- .../core/util/BlockletDataMapUtil.java | 180 ++++++++++++++ .../carbondata/core/util/SessionParams.java | 5 + .../core/util/path/CarbonTablePath.java | 2 +- .../TestBlockletDataMapFactory.java | 126 ++++++++++ .../apache/carbondata/hadoop/CacheClient.java | 49 ++++ .../hadoop/api/AbstractDataMapJob.java | 42 ++++ .../hadoop/api/CarbonFileInputFormat.java | 2 +- .../hadoop/api/CarbonInputFormat.java | 27 ++- .../hadoop/api/CarbonTableInputFormat.java | 2 +- .../carbondata/hadoop/api/DataMapJob.java | 6 + .../hadoop/util/CarbonInputFormatUtil.java | 43 +++- .../lucene/LuceneFineGrainDataMapSuite.scala | 1 + .../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +- .../carbondata/spark/rdd/SparkDataMapJob.scala | 4 +- .../org/apache/spark/sql/CarbonCountStar.scala | 3 + .../execution/command/CarbonHiveCommands.scala | 9 + 42 files changed, 1335 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/Cache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java index 04fa18a..6df36fc 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java @@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache; import java.io.IOException; import java.util.List; +import org.apache.carbondata.core.memory.MemoryException; + /** * A semi-persistent mapping from keys to values. Cache entries are manually added using * #get(Key), #getAll(List<Keys>) , and are stored in the cache until @@ -69,6 +71,14 @@ public interface Cache<K, V> { void invalidate(K key); /** + * This method will add the value to the cache for the given key + * + * @param key + * @param value + */ + void put(K key, V value) throws IOException, MemoryException; + + /** * Access count of Cacheable entry will be decremented * * @param keys http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java index fb67208..83c7237 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java @@ -59,6 +59,12 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId initThreadPoolSize(); } + @Override + public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + /** * This method will initialize the thread pool size to be used for creating the * max number of threads for a job http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index f9bf220..56607b9 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1726,6 +1726,9 @@ public final class CarbonCommonConstants { */ public static final String INDEX_COLUMNS = "INDEX_COLUMNS"; + // Property to enable parallel datamap loading for a table + public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel."; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java new file mode 100644 index 0000000..dba0840 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap.dev; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.memory.MemoryException; + +/** + * Interface for data map caching + */ +public interface CacheableDataMap { + + /** + * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier + * + * @param tableBlockIndexUniqueIdentifier + * @param blockletDataMapIndexWrapper + */ + void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException; + + /** + * Get all the uncached distributables from the list. + * + * @param distributables + * @return + */ + List<DataMapDistributable> getAllUncachedDistributables(List<DataMapDistributable> distributables) + throws IOException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index 537c635..d325f21 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.datastore.block.SegmentTaskIndex; import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -142,6 +143,12 @@ public class SegmentTaskIndexStore lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } + @Override + public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) + throws IOException, MemoryException { + throw new UnsupportedOperationException("Operation not supported"); + } + /** * returns block timestamp value from the given task * @param taskKey http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 0419405..7255237 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -526,7 +527,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { } @Override - public CarbonFile[] locationAwareListFiles() throws IOException { + public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException { if (null != fileStatus && fileStatus.isDirectory()) { List<FileStatus> listStatus = new ArrayList<>(); Path path = fileStatus.getPath(); @@ -534,7 +535,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path); while (iter.hasNext()) { LocatedFileStatus fileStatus = iter.next(); - if (fileStatus.getLen() > 0) { + if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) { listStatus.add(fileStatus); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java index eb65dfd..a104137 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; public interface CarbonFile { @@ -41,7 +42,7 @@ public interface CarbonFile { * It returns list of files with location details. * @return */ - CarbonFile[] locationAwareListFiles() throws IOException; + CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException; String getName(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java index d28e85e..60b7e17 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -49,6 +49,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.xerial.snappy.SnappyInputStream; import org.xerial.snappy.SnappyOutputStream; @@ -448,7 +449,7 @@ public class LocalCarbonFile implements CarbonFile { return file.createNewFile(); } - @Override public CarbonFile[] locationAwareListFiles() throws IOException { + @Override public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException { return listFiles(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java new file mode 100644 index 0000000..e6bc691 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.io.Serializable; + +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; + +/** + * Store the data map row @{@link DataMapRow} + */ +public abstract class AbstractMemoryDMStore implements Serializable { + + protected boolean isMemoryFreed; + + protected CarbonRowSchema[] schema; + + protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); + + public AbstractMemoryDMStore(CarbonRowSchema[] schema) { + this.schema = schema; + } + + public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException; + + public abstract DataMapRow getDataMapRow(int index); + + public abstract void freeMemory(); + + public abstract int getMemoryUsed(); + + public CarbonRowSchema[] getSchema() { + return schema; + } + + public abstract int getRowCount(); + + public void finishWriting() throws MemoryException { + // do nothing in default implementation + } + + public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException { + throw new UnsupportedOperationException("Operation not allowed"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 167a04e..ba4193e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -30,26 +29,19 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CarbonLRUCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.util.DataFileFooterConverter; - -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.carbondata.core.util.BlockletDataMapUtil; /** * Class to handle loading, unloading,clearing,storing of the table * blocks */ public class BlockletDataMapIndexStore - implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> { + implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> { private static final LogService LOGGER = LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName()); /** @@ -76,106 +68,93 @@ public class BlockletDataMapIndexStore } @Override - public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier) + public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier) throws IOException { String lruCacheKey = identifier.getUniqueTableSegmentIdentifier(); - BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey); - if (dataMap == null) { + BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = + (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey); + List<BlockletDataMap> dataMaps = new ArrayList<>(); + if (blockletDataMapIndexWrapper == null) { try { SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Set<String> filesRead = new HashSet<>(); - Map<String, BlockMetaInfo> blockMetaInfoMap = - getBlockMetaInfoMap(identifier, indexFileStore, filesRead); - dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap); - } catch (MemoryException e) { + long memorySize = 0L; + String segmentFilePath = identifier.getIndexFilePath(); + Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil + .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath); + // if the identifier is not a merge file we can directly load the datamaps + if (identifier.getMergeIndexFileName() == null) { + Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil + .getBlockMetaInfoMap(identifier, indexFileStore, filesRead, + carbonDataFileBlockMetaInfoMapping); + BlockletDataMap blockletDataMap = + loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap); + memorySize += blockletDataMap.getMemorySize(); + dataMaps.add(blockletDataMap); + blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps); + } else { + // if the identifier is a merge file then collect the index files and load the datamaps + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore); + for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier : + tableBlockIndexUniqueIdentifiers) { + Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil + .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead, + carbonDataFileBlockMetaInfoMapping); + BlockletDataMap blockletDataMap = + loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap); + memorySize += blockletDataMap.getMemorySize(); + dataMaps.add(blockletDataMap); + } + blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps); + } + lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper, + memorySize); + } catch (Throwable e) { + // clear all the memory used by datamaps loaded + for (DataMap dataMap : dataMaps) { + dataMap.clear(); + } LOGGER.error("memory exception when loading datamap: " + e.getMessage()); throw new RuntimeException(e.getMessage(), e); } } - return dataMap; - } - - private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier, - SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException { - if (identifier.getMergeIndexFileName() != null) { - CarbonFile indexMergeFile = FileFactory.getCarbonFile( - identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getMergeIndexFileName()); - if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) { - indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); - filesRead.add(indexMergeFile.getPath()); - } - } - if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { - indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile( - identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getIndexFileName()) }); - } - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); - Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); - List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo( - identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName())); - for (DataFileFooter footer : indexInfo) { - String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); - if (FileFactory.isFileExist(blockPath)) { - blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath)); - } else { - LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName() - + " The block does not exist. The block might be got deleted due to clean up post" - + " update/delete operation over table."); - } - } - return blockMetaInfoMap; - } - - private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException { - CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); - if (carbonFile instanceof AbstractDFSCarbonFile) { - RemoteIterator<LocatedFileStatus> iter = - ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile)); - LocatedFileStatus fileStatus = iter.next(); - String[] location = fileStatus.getBlockLocations()[0].getHosts(); - long len = fileStatus.getLen(); - return new BlockMetaInfo(location, len); - } else { - return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize()); - } + return blockletDataMapIndexWrapper; } @Override - public List<BlockletDataMap> getAll( + public List<BlockletDataMapIndexWrapper> getAll( List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException { - List<BlockletDataMap> blockletDataMaps = + List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>(); + BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null; // Get the datamaps for each indexfile from cache. try { for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) { - BlockletDataMap ifPresent = getIfPresent(identifier); - if (ifPresent != null) { - blockletDataMaps.add(ifPresent); + BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier); + if (dataMapIndexWrapper != null) { + blockletDataMapIndexWrappers.add(dataMapIndexWrapper); } else { missedIdentifiers.add(identifier); } } if (missedIdentifiers.size() > 0) { - SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); - Set<String> filesRead = new HashSet<>(); - for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) { - Map<String, BlockMetaInfo> blockMetaInfoMap = - getBlockMetaInfoMap(identifier, indexFileStore, filesRead); - blockletDataMaps.add( - loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap)); + for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) { + blockletDataMapIndexWrapper = get(identifier); + blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper); } } } catch (Throwable e) { - for (BlockletDataMap dataMap : blockletDataMaps) { - dataMap.clear(); + if (null != blockletDataMapIndexWrapper) { + List<BlockletDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps(); + for (DataMap dataMap : dataMaps) { + dataMap.clear(); + } } throw new IOException("Problem in loading segment blocks.", e); } - return blockletDataMaps; + return blockletDataMapIndexWrappers; } /** @@ -185,9 +164,9 @@ public class BlockletDataMapIndexStore * @return */ @Override - public BlockletDataMap getIfPresent( + public BlockletDataMapIndexWrapper getIfPresent( TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) { - return (BlockletDataMap) lruCache.get( + return (BlockletDataMapIndexWrapper) lruCache.get( tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } @@ -201,6 +180,44 @@ public class BlockletDataMapIndexStore lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } + @Override + public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException { + String uniqueTableSegmentIdentifier = + tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(); + Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier); + if (lock == null) { + lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier); + } + long memorySize = 0L; + // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry + // as in that case clearing unsafe memory need to be taken card. If at all datamap entry + // in the cache need to be overwritten then use the invalidate interface + // and then use the put interface + if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { + synchronized (lock) { + if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { + List<BlockletDataMap> dataMaps = wrapper.getDataMaps(); + try { + for (BlockletDataMap blockletDataMap: dataMaps) { + blockletDataMap.convertToUnsafeDMStore(); + memorySize += blockletDataMap.getMemorySize(); + } + lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper, + memorySize); + } catch (Throwable e) { + // clear all the memory acquired by data map in case of any failure + for (DataMap blockletDataMap : dataMaps) { + blockletDataMap.clear(); + } + throw new IOException("Problem in adding datamap to cache.", e); + } + } + } + } + } + + /** * Below method will be used to load the segment of segments * One segment may have multiple task , so table segment will be loaded @@ -228,8 +245,6 @@ public class BlockletDataMapIndexStore identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()), blockMetaInfoMap, identifier.getSegmentId())); - lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap, - dataMap.getMemorySize()); } return dataMap; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java new file mode 100644 index 0000000..d674cb4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.io.Serializable; +import java.util.List; + +import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; + +/** + * A cacheable wrapper of datamaps + */ +public class BlockletDataMapIndexWrapper implements Cacheable, Serializable { + + private List<BlockletDataMap> dataMaps; + + public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) { + this.dataMaps = dataMaps; + } + + @Override public long getFileTimeStamp() { + return 0; + } + + @Override public int getAccessCount() { + return 0; + } + + @Override public long getMemorySize() { + return 0; + } + + public List<BlockletDataMap> getDataMaps() { + return dataMaps; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java index 660f4c1..8bae7fd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java @@ -22,20 +22,29 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.hadoop.io.Writable; -import org.xerial.snappy.Snappy; /** * Blocklet detail information to be sent to each executor */ public class BlockletDetailInfo implements Serializable, Writable { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletDetailInfo.class.getName()); + + private static final long serialVersionUID = 7957493757421513808L; + private int rowCount; private short pagesCount; @@ -50,6 +59,8 @@ public class BlockletDetailInfo implements Serializable, Writable { private BlockletInfo blockletInfo; + private byte[] blockletInfoBinary; + private long blockFooterOffset; private List<ColumnSchema> columnSchemas; @@ -83,6 +94,13 @@ public class BlockletDetailInfo implements Serializable, Writable { } public BlockletInfo getBlockletInfo() { + if (null == blockletInfo) { + try { + setBlockletInfoFromBinary(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } return blockletInfo; } @@ -90,6 +108,26 @@ public class BlockletDetailInfo implements Serializable, Writable { this.blockletInfo = blockletInfo; } + private void setBlockletInfoFromBinary() throws IOException { + if (null == this.blockletInfo && null != blockletInfoBinary && blockletInfoBinary.length > 0) { + blockletInfo = new BlockletInfo(); + ByteArrayInputStream stream = new ByteArrayInputStream(blockletInfoBinary); + DataInputStream inputStream = new DataInputStream(stream); + try { + blockletInfo.readFields(inputStream); + } catch (IOException e) { + LOGGER.error("Problem in reading blocklet info"); + throw new IOException("Problem in reading blocklet info." + e.getMessage()); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + LOGGER.error(e, "Problem in closing input stream of reading blocklet info."); + } + } + } + } + public int[] getDimLens() { return dimLens; } @@ -131,6 +169,8 @@ public class BlockletDetailInfo implements Serializable, Writable { out.writeLong(blockFooterOffset); out.writeInt(columnSchemaBinary.length); out.write(columnSchemaBinary); + out.writeInt(blockletInfoBinary.length); + out.write(blockletInfoBinary); out.writeLong(blockSize); } @@ -153,6 +193,10 @@ public class BlockletDetailInfo implements Serializable, Writable { byte[] schemaArray = new byte[bytesSize]; in.readFully(schemaArray); readColumnSchema(schemaArray); + int byteSize = in.readInt(); + blockletInfoBinary = new byte[byteSize]; + in.readFully(blockletInfoBinary); + setBlockletInfoFromBinary(); blockSize = in.readLong(); } @@ -162,17 +206,8 @@ public class BlockletDetailInfo implements Serializable, Writable { * @throws IOException */ public void readColumnSchema(byte[] schemaArray) throws IOException { - // uncompress it. - schemaArray = Snappy.uncompress(schemaArray); - ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray); - DataInput schemaInput = new DataInputStream(schemaStream); - columnSchemas = new ArrayList<>(); - int size = schemaInput.readShort(); - for (int i = 0; i < size; i++) { - ColumnSchema columnSchema = new ColumnSchema(); - columnSchema.readFields(schemaInput); - columnSchemas.add(columnSchema); - } + BlockletDataMap blockletDataMap = new BlockletDataMap(); + columnSchemas = blockletDataMap.readColumnSchema(schemaArray); } /** @@ -223,4 +258,9 @@ public class BlockletDetailInfo implements Serializable, Writable { public byte[] getColumnSchemaBinary() { return columnSchemaBinary; } + + public void setBlockletInfoBinary(byte[] blockletInfoBinary) { + this.blockletInfoBinary = blockletInfoBinary; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java new file mode 100644 index 0000000..d7a1b8f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.util.DataTypeUtil; + +/** + * Store the data map row @{@link DataMapRow} data to memory. + */ +public class SafeMemoryDMStore extends AbstractMemoryDMStore { + + /** + * holds all blocklets metadata in memory + */ + private List<DataMapRow> dataMapRows = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + private int runningLength; + + public SafeMemoryDMStore(CarbonRowSchema[] schema) { + super(schema); + } + + /** + * Add the index row to dataMapRows, basically to in memory. + * + * @param indexRow + * @return + */ + @Override + public void addIndexRow(DataMapRow indexRow) throws MemoryException { + dataMapRows.add(indexRow); + runningLength += indexRow.getTotalSizeInBytes(); + } + + @Override + public DataMapRow getDataMapRow(int index) { + assert (index < dataMapRows.size()); + return dataMapRows.get(index); + } + + @Override + public void freeMemory() { + if (!isMemoryFreed) { + if (null != dataMapRows) { + dataMapRows.clear(); + dataMapRows = null; + } + isMemoryFreed = true; + } + } + + @Override + public int getMemoryUsed() { + return runningLength; + } + + @Override + public int getRowCount() { + return dataMapRows.size(); + } + + @Override + public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException { + setSchemaDataType(); + UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema); + for (DataMapRow dataMapRow : dataMapRows) { + unsafeMemoryDMStore.addIndexRow(dataMapRow); + } + unsafeMemoryDMStore.finishWriting(); + return unsafeMemoryDMStore; + } + + /** + * Set the dataType to the schema. Needed in case of serialization / deserialization + */ + private void setSchemaDataType() { + for (CarbonRowSchema carbonRowSchema : schema) { + carbonRowSchema.setDataType(DataTypeUtil.valueOf(carbonRowSchema.getDataType(), 0, 0)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java index c907fa8..3226ceb 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.indexstore; +import java.io.Serializable; import java.util.Objects; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -24,7 +25,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; /** * Class holds the indexFile information to uniquely identitify the carbon index */ -public class TableBlockIndexUniqueIdentifier { +public class TableBlockIndexUniqueIdentifier implements Serializable { + + private static final long serialVersionUID = 5808112137916196344L; private String indexFilePath; http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 31ecac2..ca5e2dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; @@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; /** * Store the data map row @{@link DataMapRow} data to unsafe. */ -public class UnsafeMemoryDMStore { +public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { - private MemoryBlock memoryBlock; + private static final long serialVersionUID = -5344592407101055335L; + + private transient MemoryBlock memoryBlock; private static int capacity = 8 * 1024; @@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore { private int runningLength; - private boolean isMemoryFreed; - - private CarbonRowSchema[] schema; - private int[] pointers; private int rowCount; - private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); - public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException { - this.schema = schema; + super(schema); this.allocatedSize = capacity; this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize); this.pointers = new int[1000]; @@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore { * @param indexRow * @return */ - public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException { + public void addIndexRow(DataMapRow indexRow) throws MemoryException { // First calculate the required memory to keep the row in unsafe int rowSize = indexRow.getTotalSizeInBytes(); // Check whether allocated memory is sufficient or not. @@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore { } } - public UnsafeDataMapRow getUnsafeRow(int index) { + public DataMapRow getDataMapRow(int index) { assert (index < rowCount); return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]); } @@ -205,12 +200,8 @@ public class UnsafeMemoryDMStore { return runningLength; } - public CarbonRowSchema[] getSchema() { - return schema; - } - public int getRowCount() { return rowCount; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index f72dc06..3ff9cdc 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -18,10 +18,12 @@ package org.apache.carbondata.core.indexstore.blockletindex; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -32,18 +34,19 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.Cacheable; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.SafeMemoryDMStore; import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; @@ -76,11 +79,13 @@ import org.xerial.snappy.Snappy; /** * Datamap implementation for blocklet. */ -public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { +public class BlockletDataMap extends CoarseGrainDataMap implements Serializable { private static final LogService LOGGER = LogServiceFactory.getLogService(BlockletDataMap.class.getName()); + private static final long serialVersionUID = -2170289352240810993L; + private static int KEY_INDEX = 0; private static int MIN_VALUES_INDEX = 1; @@ -119,14 +124,17 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { private static int SEGMENTID = 5; - private UnsafeMemoryDMStore unsafeMemoryDMStore; + private AbstractMemoryDMStore memoryDMStore; - private UnsafeMemoryDMStore unsafeMemorySummaryDMStore; + private AbstractMemoryDMStore summaryDMStore; - private SegmentProperties segmentProperties; + // As it is a heavy object it is not recommended to serialize this object + private transient SegmentProperties segmentProperties; private int[] columnCardinality; + private long blockletSchemaTime; + @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException { long startTime = System.currentTimeMillis(); @@ -150,11 +158,12 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { if (segmentProperties == null) { List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); schemaBinary = convertSchemaToBinary(columnInTable); + blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp(); columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); segmentProperties = new SegmentProperties(columnInTable, columnCardinality); - createSchema(segmentProperties); + createSchema(segmentProperties, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe()); createSummarySchema(segmentProperties, schemaBinary, filePath, fileName, - segmentId); + segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe()); } TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); BlockMetaInfo blockMetaInfo = @@ -185,21 +194,23 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { } } } - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.finishWriting(); + if (memoryDMStore != null) { + memoryDMStore.finishWriting(); } - if (null != unsafeMemorySummaryDMStore) { + if (null != summaryDMStore) { addTaskSummaryRowToUnsafeMemoryStore( summaryRow, schemaBinary, filePath, fileName, segmentId); - unsafeMemorySummaryDMStore.finishWriting(); + summaryDMStore.finishWriting(); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is " + + (System.currentTimeMillis() - startTime)); } - LOGGER.info( - "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + ( - System.currentTimeMillis() - startTime)); } private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter, @@ -207,10 +218,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { BlockMetaInfo blockMetaInfo, int relativeBlockletId) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); List<BlockletInfo> blockletList = fileFooter.getBlockletList(); - CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + CarbonRowSchema[] schema = memoryDMStore.getSchema(); // Add one row to maintain task level min max for segment pruning if (!blockletList.isEmpty() && summaryRow == null) { - summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); + summaryRow = new DataMapRowImpl(summaryDMStore.getSchema()); } for (int index = 0; index < blockletList.size(); index++) { DataMapRow row = new DataMapRowImpl(schema); @@ -226,7 +237,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); // compute and set task level min values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, TASK_MIN_VALUES_INDEX, true); ordinal++; taskMinMaxOrdinal++; @@ -234,7 +245,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); // compute and set task level max values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, TASK_MAX_VALUES_INDEX, false); ordinal++; @@ -269,7 +280,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { row.setShort((short) relativeBlockletId++, ordinal++); // Store block size row.setLong(blockMetaInfo.getSize(), ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); + memoryDMStore.addIndexRow(row); } catch (Exception e) { throw new RuntimeException(e); } @@ -295,10 +306,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { BlockMetaInfo blockMetaInfo) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); BlockletIndex blockletIndex = fileFooter.getBlockletIndex(); - CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + CarbonRowSchema[] schema = memoryDMStore.getSchema(); // Add one row to maintain task level min max for segment pruning if (summaryRow == null) { - summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); + summaryRow = new DataMapRowImpl(summaryDMStore.getSchema()); } DataMapRow row = new DataMapRowImpl(schema); int ordinal = 0; @@ -317,14 +328,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal); // compute and set task level min values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues, TASK_MIN_VALUES_INDEX, true); ordinal++; taskMinMaxOrdinal++; row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal); // compute and set task level max values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues, + summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues, TASK_MAX_VALUES_INDEX, false); ordinal++; @@ -357,7 +368,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { // store block size row.setLong(blockMetaInfo.getSize(), ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); + memoryDMStore.addIndexRow(row); } catch (Exception e) { throw new RuntimeException(e); } @@ -378,7 +389,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { summaryRow.setByteArray(fileName, INDEX_FILE_NAME); summaryRow.setByteArray(segmentId, SEGMENTID); try { - unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow); + summaryDMStore.addIndexRow(summaryRow); } catch (Exception e) { throw new RuntimeException(e); } @@ -516,7 +527,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { taskMinMaxRow.setRow(row, ordinal); } - private void createSchema(SegmentProperties segmentProperties) throws MemoryException { + private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe) + throws MemoryException { List<CarbonRowSchema> indexSchemas = new ArrayList<>(); // Index key @@ -553,8 +565,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { // for storing block length. indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); - unsafeMemoryDMStore = - new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); + CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]); + memoryDMStore = getMemoryDMStore(schema, addToUnsafe); } /** @@ -565,7 +577,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { * @throws MemoryException */ private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary, - byte[] filePath, byte[] fileName, byte[] segmentId) + byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe) throws MemoryException { List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(); getMinMaxSchema(segmentProperties, taskMinMaxSchemas); @@ -581,8 +593,9 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { // for storing segmentid taskMinMaxSchemas.add( new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length)); - unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore( - taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()])); + CarbonRowSchema[] schema = + taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]); + summaryDMStore = getMemoryDMStore(schema, addToUnsafe); } private void getMinMaxSchema(SegmentProperties segmentProperties, @@ -611,8 +624,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { public boolean isScanRequired(FilterResolverIntf filterExp) { FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i); + for (int i = 0; i < summaryDMStore.getRowCount(); i++) { + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i); boolean isScanRequired = FilterExpressionProcessor.isScanRequired( filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX), getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX)); @@ -624,26 +637,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { } private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) { - if (unsafeMemoryDMStore.getRowCount() == 0) { + if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } List<Blocklet> blocklets = new ArrayList<>(); int numBlocklets = 0; if (filterExp == null) { - numBlocklets = unsafeMemoryDMStore.getRowCount(); + numBlocklets = memoryDMStore.getRowCount(); for (int i = 0; i < numBlocklets; i++) { - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow(); + DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow(); blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX))); } } else { // Remove B-tree jump logic as start and end key prepared is not // correct for old store scenarios int startIndex = 0; - numBlocklets = unsafeMemoryDMStore.getRowCount(); + numBlocklets = memoryDMStore.getRowCount(); FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); while (startIndex < numBlocklets) { - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow(); + DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow(); int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX); String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS); @@ -663,7 +676,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { @Override public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) { - if (unsafeMemoryDMStore.getRowCount() == 0) { + if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } // if it has partitioned datamap but there is no partitioned information stored, it means @@ -740,10 +753,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { public ExtendedBlocklet getDetailedBlocklet(String blockletId) { int index = Integer.parseInt(blockletId); - DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow(); + DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow(); return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)); } + /** + * Get the index file name of the blocklet data map + * + * @return + */ + public String getIndexFileName() { + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0); + try { + return new String(unsafeRow.getByteArray(INDEX_FILE_NAME), + CarbonCommonConstants.DEFAULT_CHARSET); + } catch (UnsupportedEncodingException e) { + // should never happen! + throw new IllegalArgumentException("UTF8 encoding is not supported", e); + } + } + private byte[][] getMinMaxValue(DataMapRow row, int index) { DataMapRow minMaxRow = row.getRow(index); byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; @@ -764,23 +793,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { detailInfo.setBlockletId((short) blockletId); detailInfo.setDimLens(columnCardinality); detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX)); - byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX); - BlockletInfo blockletInfo = null; + detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX)); try { - if (byteArray.length > 0) { - blockletInfo = new BlockletInfo(); - ByteArrayInputStream stream = new ByteArrayInputStream(byteArray); - DataInputStream inputStream = new DataInputStream(stream); - blockletInfo.readFields(inputStream); - inputStream.close(); - } blocklet.setLocation( new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET) .split(",")); } catch (IOException e) { throw new RuntimeException(e); } - detailInfo.setBlockletInfo(blockletInfo); blocklet.setDetailInfo(detailInfo); detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET)); detailInfo.setColumnSchemaBinary(getColumnSchemaBinary()); @@ -791,7 +811,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { private String[] getFileDetails() { try { String[] fileDetails = new String[3]; - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0); fileDetails[0] = new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET); fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME), @@ -815,14 +835,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) { int childNodeIndex; int low = 0; - int high = unsafeMemoryDMStore.getRowCount() - 1; + int high = memoryDMStore.getRowCount() - 1; int mid = 0; int compareRes = -1; // while (low <= high) { mid = (low + high) >>> 1; // compare the entries - compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid)); if (compareRes < 0) { high = mid - 1; } else if (compareRes > 0) { @@ -831,7 +851,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { // if key is matched then get the first entry int currentPos = mid; while (currentPos - 1 >= 0 - && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) { + && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) { currentPos--; } mid = currentPos; @@ -863,14 +883,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) { int childNodeIndex; int low = 0; - int high = unsafeMemoryDMStore.getRowCount() - 1; + int high = memoryDMStore.getRowCount() - 1; int mid = 0; int compareRes = -1; // while (low <= high) { mid = (low + high) >>> 1; // compare the entries - compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid)); if (compareRes < 0) { high = mid - 1; } else if (compareRes > 0) { @@ -878,8 +898,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { } else { int currentPos = mid; // if key is matched then get the first entry - while (currentPos + 1 < unsafeMemoryDMStore.getRowCount() - && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) { + while (currentPos + 1 < memoryDMStore.getRowCount() + && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) { currentPos++; } mid = currentPos; @@ -907,13 +927,13 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { buffer.putInt(key.getNoDictionaryKeys().length); buffer.put(key.getDictionaryKeys()); buffer.put(key.getNoDictionaryKeys()); - DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema()); + DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema()); dataMapRow.setByteArray(buffer.array(), 0); return dataMapRow; } private byte[] getColumnSchemaBinary() { - DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0); return unsafeRow.getByteArray(SCHEMA); } @@ -937,36 +957,25 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { @Override public void clear() { - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.freeMemory(); - unsafeMemoryDMStore = null; + if (memoryDMStore != null) { + memoryDMStore.freeMemory(); + memoryDMStore = null; segmentProperties = null; } // clear task min/max unsafe memory - if (null != unsafeMemorySummaryDMStore) { - unsafeMemorySummaryDMStore.freeMemory(); - unsafeMemorySummaryDMStore = null; + if (null != summaryDMStore) { + summaryDMStore.freeMemory(); + summaryDMStore = null; } } - @Override - public long getFileTimeStamp() { - return 0; - } - - @Override - public int getAccessCount() { - return 0; - } - - @Override public long getMemorySize() { long memoryUsed = 0L; - if (unsafeMemoryDMStore != null) { - memoryUsed += unsafeMemoryDMStore.getMemoryUsed(); + if (memoryDMStore != null) { + memoryUsed += memoryDMStore.getMemoryUsed(); } - if (null != unsafeMemorySummaryDMStore) { - memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed(); + if (null != summaryDMStore) { + memoryUsed += summaryDMStore.getMemoryUsed(); } return memoryUsed; } @@ -975,4 +984,65 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable { return segmentProperties; } + public void setSegmentProperties(SegmentProperties segmentProperties) { + this.segmentProperties = segmentProperties; + } + + public int[] getColumnCardinality() { + return columnCardinality; + } + + private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe) + throws MemoryException { + AbstractMemoryDMStore memoryDMStore; + if (addToUnsafe) { + memoryDMStore = new UnsafeMemoryDMStore(schema); + } else { + memoryDMStore = new SafeMemoryDMStore(schema); + } + return memoryDMStore; + } + + /** + * This method will ocnvert safe to unsafe memory DM store + * + * @throws MemoryException + */ + public void convertToUnsafeDMStore() throws MemoryException { + if (memoryDMStore instanceof SafeMemoryDMStore) { + UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore(); + memoryDMStore.freeMemory(); + memoryDMStore = unsafeMemoryDMStore; + } + if (summaryDMStore instanceof SafeMemoryDMStore) { + UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore(); + summaryDMStore.freeMemory(); + summaryDMStore = unsafeSummaryMemoryDMStore; + } + } + + /** + * Read column schema from binary + * @param schemaArray + * @throws IOException + */ + public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException { + // uncompress it. + schemaArray = Snappy.uncompress(schemaArray); + ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray); + DataInput schemaInput = new DataInputStream(schemaStream); + List<ColumnSchema> columnSchemas = new ArrayList<>(); + int size = schemaInput.readShort(); + for (int i = 0; i < size; i++) { + ColumnSchema columnSchema = new ColumnSchema(); + columnSchema.readFields(schemaInput); + columnSchemas.add(columnSchema); + } + return columnSchemas; + } + + public long getBlockletSchemaTime() { + return blockletSchemaTime; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java index 99e48a5..7cdf77d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.indexstore.blockletindex; import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; /** * This class contains required information to make the Blocklet datamap distributable. @@ -31,6 +32,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable { */ private String filePath; + private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier; + public BlockletDataMapDistributable(String indexFilePath) { this.filePath = indexFilePath; } @@ -38,4 +41,13 @@ public class BlockletDataMapDistributable extends DataMapDistributable { public String getFilePath() { return filePath; } + + public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() { + return tableBlockIndexUniqueIdentifier; + } + + public void setTableBlockIndexUniqueIdentifier( + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifiers) { + this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifiers; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index c0bc2a6..c3df721 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -21,13 +21,16 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.CacheableDataMap; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapRefresher; import org.apache.carbondata.core.datamap.dev.DataMapWriter; @@ -38,15 +41,17 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -59,7 +64,7 @@ import org.apache.hadoop.fs.RemoteIterator; * Table map for blocklet */ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory - implements BlockletDetailsFetcher, SegmentPropertiesFetcher { + implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableDataMap { private static final String NAME = "clustered.btree.blocklet"; @@ -69,9 +74,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private AbsoluteTableIdentifier identifier; // segmentId -> list of index file - private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); - private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache; + private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache; public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) { super(carbonTable, dataMapSchema); @@ -91,24 +96,27 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory } @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { - List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); + Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); - return cache.getAll(tableBlockIndexUniqueIdentifiers); + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + new ArrayList<>(identifiers.size()); + tableBlockIndexUniqueIdentifiers.addAll(identifiers); + List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = + cache.getAll(tableBlockIndexUniqueIdentifiers); + for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) { + dataMaps.addAll(wrapper.getDataMaps()); + } + return dataMaps; } - private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment) + private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment) throws IOException { - List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = segmentMap.get(segment.getSegmentNo()); if (tableBlockIndexUniqueIdentifiers == null) { - tableBlockIndexUniqueIdentifiers = new ArrayList<>(); - Map<String, String> indexFiles = segment.getCommittedIndexFile(); - for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) { - Path indexFile = new Path(indexFileEntry.getKey()); - tableBlockIndexUniqueIdentifiers.add( - new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), - indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo())); - } + tableBlockIndexUniqueIdentifiers = + BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment); segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers); } return tableBlockIndexUniqueIdentifiers; @@ -130,7 +138,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory } return detailedBlocklets; } - List<TableBlockIndexUniqueIdentifier> identifiers = + Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); // Retrieve each blocklets detail information from blocklet datamap for (Blocklet blocklet : blocklets) { @@ -145,17 +153,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory if (blocklet instanceof ExtendedBlocklet) { return (ExtendedBlocklet) blocklet; } - List<TableBlockIndexUniqueIdentifier> identifiers = - getTableBlockIndexUniqueIdentifiers(segment); + Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); return getExtendedBlocklet(identifiers, blocklet); } - private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers, + private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers, Blocklet blocklet) throws IOException { for (TableBlockIndexUniqueIdentifier identifier : identifiers) { - if (identifier.getIndexFileName().startsWith(blocklet.getFilePath())) { - DataMap dataMap = cache.get(identifier); - return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId()); + BlockletDataMapIndexWrapper wrapper = cache.get(identifier); + List<BlockletDataMap> dataMaps = wrapper.getDataMaps(); + for (DataMap dataMap : dataMaps) { + if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) { + return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId()); + } } } throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found "); @@ -166,23 +176,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory public List<DataMapDistributable> toDistributable(Segment segment) { List<DataMapDistributable> distributables = new ArrayList<>(); try { - CarbonFile[] carbonIndexFiles; - if (segment.getSegmentFileName() == null) { - carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles( - CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())); - } else { - SegmentFileStore fileStore = - new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); - Map<String, String> indexFiles = fileStore.getIndexFiles(); - carbonIndexFiles = new CarbonFile[indexFiles.size()]; - int i = 0; - for (String indexFile : indexFiles.keySet()) { - carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile); - } + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + getTableBlockIndexUniqueIdentifiers(segment); + CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()]; + int identifierCounter = 0; + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : + tableBlockIndexUniqueIdentifiers) { + String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath(); + String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName(); + carbonIndexFiles[identifierCounter++] = FileFactory + .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName); } for (int i = 0; i < carbonIndexFiles.length; i++) { Path path = new Path(carbonIndexFiles[i].getPath()); - FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); LocatedFileStatus fileStatus = iter.next(); @@ -205,13 +211,18 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory @Override public void clear(Segment segment) { - List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo()); + Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo()); if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { - DataMap dataMap = cache.getIfPresent(blockIndex); - if (dataMap != null) { - cache.invalidate(blockIndex); - dataMap.clear(); + BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex); + if (null != wrapper) { + List<BlockletDataMap> dataMaps = wrapper.getDataMaps(); + for (DataMap dataMap : dataMaps) { + if (dataMap != null) { + cache.invalidate(blockIndex); + dataMap.clear(); + } + } } } } @@ -246,9 +257,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory segmentNo)); } } - List<CoarseGrainDataMap> dataMaps; + List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); try { - dataMaps = cache.getAll(identifiers); + List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiers); + for (BlockletDataMapIndexWrapper wrapper : wrappers) { + dataMaps.addAll(wrapper.getDataMaps()); + } } catch (IOException e) { throw new RuntimeException(e); } @@ -289,4 +303,29 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory return false; } + @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException { + cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper); + } + + @Override + public List<DataMapDistributable> getAllUncachedDistributables( + List<DataMapDistributable> distributables) throws IOException { + List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size()); + for (DataMapDistributable distributable : distributables) { + Segment segment = distributable.getSegment(); + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + getTableBlockIndexUniqueIdentifiers(segment); + // filter out the tableBlockIndexUniqueIdentifiers based on distributable + TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil + .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers, + (BlockletDataMapDistributable) distributable); + if (null == cache.getIfPresent(validIdentifier)) { + ((BlockletDataMapDistributable) distributable) + .setTableBlockIndexUniqueIdentifier(validIdentifier); + distributablesToBeLoaded.add(distributable); + } + } + return distributablesToBeLoaded; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index ebeb278..7443d15 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel { private String segmentId; + private boolean addToUnsafe = true; + public BlockletDataMapModel(String filePath, byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) { super(filePath); @@ -40,6 +42,12 @@ public class BlockletDataMapModel extends DataMapModel { this.segmentId = segmentId; } + public BlockletDataMapModel(String filePath, byte[] fileData, + Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, boolean addToUnsafe) { + this(filePath, fileData, blockMetaInfoMap, segmentId); + this.addToUnsafe = addToUnsafe; + } + public byte[] getFileData() { return fileData; } @@ -51,4 +59,8 @@ public class BlockletDataMapModel extends DataMapModel { public String getSegmentId() { return segmentId; } + + public boolean isAddToUnsafe() { + return addToUnsafe; + } }