[CARBONDATA-1232] Datamap implementation for Blocklet This closes #1099
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6d71d9c4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6d71d9c4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6d71d9c4 Branch: refs/heads/datamap Commit: 6d71d9c474e50792fc0fba3a321c2de927b05c84 Parents: 3ecb3ec Author: ravipesala <ravi.pes...@gmail.com> Authored: Sat Jun 17 22:53:57 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Jul 11 13:17:43 2017 +0800 ---------------------------------------------------------------------- .../carbondata/core/cache/CacheProvider.java | 3 + .../apache/carbondata/core/cache/CacheType.java | 6 + .../core/datastore/block/TableBlockInfo.java | 19 + .../core/datastore/block/TaskBlockInfo.java | 4 + .../carbondata/core/indexstore/Blocklet.java | 55 +- .../indexstore/BlockletDataMapIndexStore.java | 180 ++++++ .../core/indexstore/BlockletDetailInfo.java | 117 ++++ .../carbondata/core/indexstore/DataMap.java | 8 +- .../core/indexstore/DataMapFactory.java | 87 +++ .../core/indexstore/DataMapStoreManager.java | 90 ++- .../carbondata/core/indexstore/DataMapType.java | 14 +- .../TableBlockIndexUniqueIdentifier.java | 103 ++++ .../core/indexstore/TableDataMap.java | 97 +++- .../core/indexstore/UnsafeMemoryDMStore.java | 207 +++++++ .../blockletindex/BlockletDMComparator.java | 134 +++++ .../blockletindex/BlockletDataMap.java | 445 +++++++++++++++ .../blockletindex/BlockletDataMapFactory.java | 115 ++++ .../BlockletDataRefNodeWrapper.java | 137 +++++ .../indexstore/blockletindex/IndexWrapper.java | 49 ++ .../core/indexstore/row/DataMapRow.java | 89 +++ .../core/indexstore/row/DataMapRowImpl.java | 106 ++++ .../core/indexstore/row/UnsafeDataMapRow.java | 133 +++++ .../core/indexstore/schema/DataMapSchema.java | 124 ++++ .../core/indexstore/schema/FilterType.java | 24 + .../core/metadata/blocklet/BlockletInfo.java | 53 +- .../core/metadata/index/BlockIndexInfo.java | 27 + .../executor/impl/AbstractQueryExecutor.java | 52 +- .../executer/IncludeFilterExecuterImpl.java | 2 +- .../executer/RangeValueFilterExecuterImpl.java | 2 +- .../RowLevelRangeGrtThanFiterExecuterImpl.java | 2 +- ...elRangeGrtrThanEquaToFilterExecuterImpl.java | 2 +- ...velRangeLessThanEqualFilterExecuterImpl.java | 2 +- .../RowLevelRangeLessThanFiterExecuterImpl.java | 2 +- .../processor/AbstractDataBlockIterator.java | 3 + .../AbstractDetailQueryResultIterator.java | 34 +- .../util/AbstractDataFileFooterConverter.java | 53 ++ .../apache/carbondata/core/util/CarbonUtil.java | 40 +- .../core/util/DataFileFooterConverter.java | 4 + .../core/util/DataFileFooterConverter2.java | 3 + .../core/util/DataFileFooterConverterV3.java | 11 + format/src/main/thrift/carbondata_index.thrift | 1 + .../carbondata/hadoop/CarbonInputFormat.java | 14 +- .../carbondata/hadoop/CarbonInputSplit.java | 39 +- .../hadoop/api/CarbonTableInputFormat.java | 562 ++++++++++++++++--- .../hadoop/util/CarbonInputFormatUtil.java | 7 +- .../presto/impl/CarbonTableReader.java | 56 +- .../spark/rdd/CarbonIUDMergerRDD.scala | 5 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 9 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +- .../carbondata/spark/util/QueryPlanUtil.scala | 10 +- .../sql/CarbonDatasourceHadoopRelation.scala | 13 +- .../sql/execution/command/IUDCommands.scala | 7 - .../carbondata/spark/util/QueryPlanUtil.scala | 10 +- .../apache/spark/sql/hive/CarbonMetastore.scala | 10 +- .../processing/merger/CarbonCompactionUtil.java | 32 ++ 55 files changed, 3172 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java index 25a8976..5c4b265 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.BlockIndexStore; import org.apache.carbondata.core.datastore.SegmentTaskIndexStore; import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexStore; import org.apache.carbondata.core.util.CarbonProperties; /** @@ -126,6 +127,8 @@ public class CacheProvider { } else if (cacheType.equals(cacheType.DRIVER_BTREE)) { cacheObject = new SegmentTaskIndexStore(carbonStorePath, carbonLRUCache); + } else if (cacheType.equals(cacheType.DRIVER_BLOCKLET_DATAMAP)) { + cacheObject = new BlockletDataMapIndexStore(carbonStorePath, carbonLRUCache); } cacheTypeToCacheMap.put(cacheType, cacheObject); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java index 2d6570d..ab51ff2 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java @@ -56,6 +56,12 @@ public class CacheType<K, V> { DRIVER_BTREE = new CacheType("driver_btree"); /** + * Executor BTree cache which maintains size of BTree metadata + */ + public static final CacheType<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> + DRIVER_BLOCKLET_DATAMAP = new CacheType("driver_blocklet_datamap"); + + /** * cacheName which is unique name for a cache */ private String cacheName; http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index 44347cf..f003882 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -77,6 +78,8 @@ public class TableBlockInfo implements Distributable, Serializable { */ private String[] deletedDeltaFilePath; + private BlockletDetailInfo detailInfo; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { this.filePath = FileFactory.getUpdatedFilePath(filePath); @@ -88,6 +91,10 @@ public class TableBlockInfo implements Distributable, Serializable { this.deletedDeltaFilePath = deletedDeltaFilePath; } + public TableBlockInfo() { + + } + /** * constructor to initialize the TbaleBlockInfo with BlockletInfos * @@ -319,4 +326,16 @@ public class TableBlockInfo implements Distributable, Serializable { public String[] getDeletedDeltaFilePath() { return deletedDeltaFilePath; } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public BlockletDetailInfo getDetailInfo() { + return detailInfo; + } + + public void setDetailInfo(BlockletDetailInfo detailInfo) { + this.detailInfo = detailInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java index eb707c2..4fcec87 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.block; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +46,9 @@ public class TaskBlockInfo { return taskBlockInfoMapping.keySet(); } + public Collection<List<TableBlockInfo>> getAllTableBlockInfoList() { + return taskBlockInfoMapping.values(); + } /** * returns TableBlockInfoList of given task http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java index 597c46c..66da4d0 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java @@ -16,27 +16,76 @@ */ package org.apache.carbondata.core.indexstore; +import java.io.IOException; import java.io.Serializable; +import org.apache.carbondata.core.datastore.impl.FileFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + /** * Blocklet */ public class Blocklet implements Serializable { - private String path; + private Path path; + + private String segmentId; private String blockletId; + private BlockletDetailInfo detailInfo; + + private long length; + + private String[] location; + public Blocklet(String path, String blockletId) { - this.path = path; + this.path = new Path(path); this.blockletId = blockletId; } - public String getPath() { + public Path getPath() { return path; } public String getBlockletId() { return blockletId; } + + public BlockletDetailInfo getDetailInfo() { + return detailInfo; + } + + public void setDetailInfo(BlockletDetailInfo detailInfo) { + this.detailInfo = detailInfo; + } + + public void updateLocations() throws IOException { + FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); + LocatedFileStatus fileStatus = iter.next(); + location = fileStatus.getBlockLocations()[0].getHosts(); + length = fileStatus.getLen(); + } + + public String[] getLocations() throws IOException { + return location; + } + + public long getLength() throws IOException { + return length; + } + + public String getSegmentId() { + return segmentId; + } + + public void setSegmentId(String segmentId) { + this.segmentId = segmentId; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java new file mode 100644 index 0000000..fc8c273 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; + +/** + * Class to handle loading, unloading,clearing,storing of the table + * blocks + */ +public class BlockletDataMapIndexStore + implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> { + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName()); + /** + * carbon store path + */ + protected String carbonStorePath; + /** + * CarbonLRU cache + */ + protected CarbonLRUCache lruCache; + + /** + * map of block info to lock object map, while loading the btree this will be filled + * and removed after loading the tree for that particular block info, this will be useful + * while loading the tree concurrently so only block level lock will be applied another + * block can be loaded concurrently + */ + private Map<String, Object> segmentLockMap; + + /** + * constructor to initialize the SegmentTaskIndexStore + * + * @param carbonStorePath + * @param lruCache + */ + public BlockletDataMapIndexStore(String carbonStorePath, CarbonLRUCache lruCache) { + this.carbonStorePath = carbonStorePath; + this.lruCache = lruCache; + segmentLockMap = new ConcurrentHashMap<String, Object>(); + } + + @Override public BlockletDataMap get(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) + throws IOException { + String lruCacheKey = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(); + BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey); + if (dataMap == null) { + try { + dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier); + } catch (IndexBuilderException e) { + throw new IOException(e.getMessage(), e); + } catch (Throwable e) { + throw new IOException("Problem in loading segment block.", e); + } + } + return dataMap; + } + + @Override public List<BlockletDataMap> getAll( + List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException { + List<BlockletDataMap> blockletDataMaps = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); + try { + for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) { + blockletDataMaps.add(get(identifier)); + } + } catch (Throwable e) { + for (BlockletDataMap dataMap : blockletDataMaps) { + dataMap.clear(); + } + throw new IOException("Problem in loading segment blocks.", e); + } + return blockletDataMaps; + } + + /** + * returns the SegmentTaskIndexWrapper + * + * @param tableSegmentUniqueIdentifier + * @return + */ + @Override public BlockletDataMap getIfPresent( + TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) { + BlockletDataMap dataMap = (BlockletDataMap) lruCache + .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + return dataMap; + } + + /** + * method invalidate the segment cache for segment + * + * @param tableSegmentUniqueIdentifier + */ + @Override public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) { + lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + } + + /** + * Below method will be used to load the segment of segments + * One segment may have multiple task , so table segment will be loaded + * based on task id and will return the map of taksId to table segment + * map + * + * @return map of taks id to segment mapping + * @throws IOException + */ + private BlockletDataMap loadAndGetDataMap( + TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException { + String uniqueTableSegmentIdentifier = + tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(); + Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier); + if (lock == null) { + lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier); + } + BlockletDataMap dataMap = null; + synchronized (lock) { + dataMap = new BlockletDataMap(); + dataMap.init(tableSegmentUniqueIdentifier.getFilePath()); + lruCache.put(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(), dataMap, + dataMap.getMemorySize()); + } + return dataMap; + } + + /** + * Below method will be used to get the segment level lock object + * + * @param uniqueIdentifier + * @return lock object + */ + private synchronized Object addAndGetSegmentLock(String uniqueIdentifier) { + // get the segment lock object if it is present then return + // otherwise add the new lock and return + Object segmentLoderLockObject = segmentLockMap.get(uniqueIdentifier); + if (null == segmentLoderLockObject) { + segmentLoderLockObject = new Object(); + segmentLockMap.put(uniqueIdentifier, segmentLoderLockObject); + } + return segmentLoderLockObject; + } + + /** + * The method clears the access count of table segments + * + * @param tableSegmentUniqueIdentifiers + */ + @Override public void clearAccessCount( + List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) { + for (TableBlockIndexUniqueIdentifier segmentUniqueIdentifier : tableSegmentUniqueIdentifiers) { + BlockletDataMap cacheable = + (BlockletDataMap) lruCache.get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + cacheable.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java new file mode 100644 index 0000000..68dedd8 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; + +import org.apache.hadoop.io.Writable; + +/** + * Blocklet detail information to be sent to each executor + */ +public class BlockletDetailInfo implements Serializable, Writable { + + private int rowCount; + + private short pagesCount; + + private short versionNumber; + + private int[] dimLens; + + private long schemaUpdatedTimeStamp; + + private BlockletInfo blockletInfo; + + public int getRowCount() { + return rowCount; + } + + public void setRowCount(int rowCount) { + this.rowCount = rowCount; + } + + public int getPagesCount() { + return pagesCount; + } + + public void setPagesCount(short pagesCount) { + this.pagesCount = pagesCount; + } + + public short getVersionNumber() { + return versionNumber; + } + + public void setVersionNumber(short versionNumber) { + this.versionNumber = versionNumber; + } + + public BlockletInfo getBlockletInfo() { + return blockletInfo; + } + + public void setBlockletInfo(BlockletInfo blockletInfo) { + this.blockletInfo = blockletInfo; + } + + public int[] getDimLens() { + return dimLens; + } + + public void setDimLens(int[] dimLens) { + this.dimLens = dimLens; + } + + public long getSchemaUpdatedTimeStamp() { + return schemaUpdatedTimeStamp; + } + + public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) { + this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp; + } + + @Override public void write(DataOutput out) throws IOException { + out.writeInt(rowCount); + out.writeShort(pagesCount); + out.writeShort(versionNumber); + out.writeShort(dimLens.length); + for (int i = 0; i < dimLens.length; i++) { + out.writeInt(dimLens[i]); + } + out.writeLong(schemaUpdatedTimeStamp); + blockletInfo.write(out); + } + + @Override public void readFields(DataInput in) throws IOException { + rowCount = in.readInt(); + pagesCount = in.readShort(); + versionNumber = in.readShort(); + dimLens = new int[in.readShort()]; + for (int i = 0; i < dimLens.length; i++) { + dimLens[i] = in.readInt(); + } + schemaUpdatedTimeStamp = in.readLong(); + blockletInfo = new BlockletInfo(); + blockletInfo.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java index 2651f15..1276494 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; /** - * Interface for adding and retrieving index data. + * Datamap is an entity which can store and retrieve index data. */ public interface DataMap { @@ -47,6 +47,12 @@ public interface DataMap { List<Blocklet> prune(FilterResolverIntf filterExp); /** + * Convert datamap to distributable object + * @return + */ + DataMapDistributable toDistributable(); + + /** * Clear complete index table and release memory. */ void clear(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java new file mode 100644 index 0000000..72f714f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +import java.util.List; + +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +/** + * Interface for datamap factory, it is responsible for creating the datamap. + */ +public interface DataMapFactory { + + /** + * Initialization of Datamap factory + * @param identifier + * @param dataMapName + */ + void init(AbsoluteTableIdentifier identifier, String dataMapName); + /** + * Get the datamap writer for each segmentid. + * + * @param identifier + * @param segmentId + * @return + */ + DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, + String segmentId); + + /** + * Get the datamap for segmentid + * + * @param segmentId + * @return + */ + List<DataMap> getDataMaps(String segmentId); + + /** + * Get datamap for distributable object. + * + * @param distributable + * @return + */ + DataMap getDataMap(DataMapDistributable distributable); + + /** + * This method checks whether the columns and the type of filters supported + * for this datamap or not + * + * @param filterType + * @return + */ + boolean isFiltersSupported(FilterType filterType); + + /** + * + * @param event + */ + void fireEvent(ChangeEvent event); + + /** + * Clears datamap of the segment + */ + void clear(String segmentId); + + /** + * Clear all datamaps from memory + */ + void clear(); + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java index 06638ad..1a36187 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java @@ -16,7 +16,9 @@ */ package org.apache.carbondata.core.indexstore; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.carbondata.common.logging.LogService; @@ -24,13 +26,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; /** - * It maintains all the index tables in it. + * It maintains all the DataMaps in it. */ -public class DataMapStoreManager { +public final class DataMapStoreManager { private static DataMapStoreManager instance = new DataMapStoreManager(); - private Map<DataMapType, Map<String, TableDataMap>> dataMapMappping = new HashMap<>(); + /** + * Contains the list of datamaps for each table. + */ + private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>(); private static final LogService LOGGER = LogServiceFactory.getLogService(DataMapStoreManager.class.getName()); @@ -48,56 +53,85 @@ public class DataMapStoreManager { */ public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName, DataMapType mapType) { - Map<String, TableDataMap> map = dataMapMappping.get(mapType); - TableDataMap dataMap = null; - if (map == null) { + List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier); + TableDataMap dataMap; + if (tableDataMaps == null) { + createTableDataMap(identifier, mapType, dataMapName); + tableDataMaps = dataMapMappping.get(identifier); + } + dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps); + if (dataMap == null) { throw new RuntimeException("Datamap does not exist"); - } else { - dataMap = map.get(dataMapName); - if (dataMap == null) { - throw new RuntimeException("Datamap does not exist"); - } } - // Initialize datamap - dataMap.init(identifier, dataMapName); return dataMap; } /** - * Create new datamap instance using datamap type and path + * Create new datamap instance using datamap name, datamap type and table identifier * * @param mapType * @return */ - public TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, DataMapType mapType, - String dataMapName) { - Map<String, TableDataMap> map = dataMapMappping.get(mapType); - if (map == null) { - map = new HashMap<>(); - dataMapMappping.put(mapType, map); + private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, + DataMapType mapType, String dataMapName) { + List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier); + if (tableDataMaps == null) { + tableDataMaps = new ArrayList<>(); + dataMapMappping.put(identifier, tableDataMaps); } - TableDataMap dataMap = map.get(dataMapName); + TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps); if (dataMap != null) { throw new RuntimeException("Already datamap exists in that path with type " + mapType); } try { - //TODO create datamap using @mapType.getClassName()) + DataMapFactory dataMapFactory = mapType.getClassObject().newInstance(); + dataMapFactory.init(identifier, dataMapName); + dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory); } catch (Exception e) { LOGGER.error(e); + throw new RuntimeException(e); + } + tableDataMaps.add(dataMap); + return dataMap; + } + + private TableDataMap getAbstractTableDataMap(String dataMapName, + List<TableDataMap> tableDataMaps) { + TableDataMap dataMap = null; + for (TableDataMap tableDataMap: tableDataMaps) { + if (tableDataMap.getDataMapName().equals(dataMapName)) { + dataMap = tableDataMap; + break; + } } - dataMap.init(identifier, dataMapName); - map.put(dataMapName, dataMap); return dataMap; } - public void clearDataMap(String dataMapName, DataMapType mapType) { - Map<String, TableDataMap> map = dataMapMappping.get(mapType); - if (map != null && map.get(dataMapName) != null) { - map.remove(dataMapName).clear(); + /** + * Clear the datamap/datamaps of a mentioned datamap name and table from memory + * @param identifier + * @param dataMapName + */ + public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) { + List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier); + if (tableDataMaps != null) { + int i = 0; + for (TableDataMap tableDataMap: tableDataMaps) { + if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) { + tableDataMap.clear(new ArrayList<String>()); + tableDataMaps.remove(i); + break; + } + i++; + } } } + /** + * Returns the singleton instance + * @return + */ public static DataMapStoreManager getInstance() { return instance; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java index b6a0f5b..0059b29 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java @@ -16,19 +16,21 @@ */ package org.apache.carbondata.core.indexstore; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; + /** * Datamap type */ public enum DataMapType { - BLOCKLET("org.apache.carbondata.datamap.BlockletDataMap"); + BLOCKLET(BlockletDataMapFactory.class); - private String className; + private Class<? extends DataMapFactory> classObject; - DataMapType(String className) { - this.className = className; + DataMapType(Class<? extends DataMapFactory> classObject) { + this.classObject = classObject; } - public String getClassName() { - return className; + public Class<? extends DataMapFactory> getClassObject() { + return classObject; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java new file mode 100644 index 0000000..7e2bc0e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; + +/** + * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment + */ +public class TableBlockIndexUniqueIdentifier { + /** + * table fully qualified identifier + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + + private String segmentId; + + private String carbonIndexFileName; + + /** + * Constructor to initialize the class instance + * + * @param absoluteTableIdentifier + * @param segmentId + */ + public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier, + String segmentId, String carbonIndexFileName) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + this.segmentId = segmentId; + this.carbonIndexFileName = carbonIndexFileName; + } + + /** + * returns AbsoluteTableIdentifier + * + * @return + */ + public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { + return absoluteTableIdentifier; + } + + public String getSegmentId() { + return segmentId; + } + + /** + * method returns the id to uniquely identify a key + * + * @return + */ + public String getUniqueTableSegmentIdentifier() { + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE + + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId + + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName; + } + + public String getFilePath() { + return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/" + + carbonIndexFileName; + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o; + + if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) { + return false; + } + if (!segmentId.equals(that.segmentId)) { + return false; + } + return carbonIndexFileName.equals(that.carbonIndexFileName); + } + + @Override public int hashCode() { + int result = absoluteTableIdentifier.hashCode(); + result = 31 * result + segmentId.hashCode(); + result = 31 * result + carbonIndexFileName.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java index e1532c8..39ca4c5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java @@ -16,38 +16,34 @@ */ package org.apache.carbondata.core.indexstore; +import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.events.ChangeEvent; import org.apache.carbondata.core.events.EventListener; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; - /** * DataMap at the table level, user can add any number of datamaps for one table. Depends * on the filter condition it can prune the blocklets. */ -public interface TableDataMap extends EventListener { +public final class TableDataMap implements EventListener { - /** - * It is called to initialize and load the required table datamap metadata. - */ - void init(AbsoluteTableIdentifier identifier, String dataMapName); + private AbsoluteTableIdentifier identifier; - /** - * Gives the writer to write the metadata information of this datamap at table level. - * - * @return - */ - DataMapWriter getWriter(); + private String dataMapName; + + private DataMapFactory dataMapFactory; /** - * Create the datamap using the segmentid and name. - * - * @param identifier - * @param segmentId - * @return + * It is called to initialize and load the required table datamap metadata. */ - DataMap createDataMap(AbsoluteTableIdentifier identifier, String segmentId); + public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName, + DataMapFactory dataMapFactory) { + this.identifier = identifier; + this.dataMapName = dataMapName; + this.dataMapFactory = dataMapFactory; + } /** * Pass the valid segments and prune the datamap using filter expression @@ -56,7 +52,24 @@ public interface TableDataMap extends EventListener { * @param filterExp * @return */ - List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp); + public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) { + List<Blocklet> blocklets = new ArrayList<>(); + for (String segmentId : segmentIds) { + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId); + for (DataMap dataMap : dataMaps) { + List<Blocklet> pruneBlocklets = dataMap.prune(filterExp); + blocklets.addAll(addSegmentId(pruneBlocklets, segmentId)); + } + } + return blocklets; + } + + private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) { + for (Blocklet blocklet : pruneBlocklets) { + blocklet.setSegmentId(segmentId); + } + return pruneBlocklets; + } /** * This is used for making the datamap distributable. @@ -65,7 +78,16 @@ public interface TableDataMap extends EventListener { * * @return */ - List<DataMapDistributable> toDistributable(List<String> segmentIds); + public List<DataMapDistributable> toDistributable(List<String> segmentIds) { + List<DataMapDistributable> distributables = new ArrayList<>(); + for (String segmentsId : segmentIds) { + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId); + for (DataMap dataMap : dataMaps) { + distributables.add(dataMap.toDistributable()); + } + } + return distributables; + } /** * This method is used from any machine after it is distributed. It takes the distributable object @@ -75,20 +97,37 @@ public interface TableDataMap extends EventListener { * @param filterExp * @return */ - List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp); + public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) { + return dataMapFactory.getDataMap(distributable).prune(filterExp); + } + + @Override public void fireEvent(ChangeEvent event) { + dataMapFactory.fireEvent(event); + } /** - * This method checks whether the columns and the type of filters supported - * for this datamap or not - * - * @param filterExp - * @return + * Clear only the datamaps of the segments + * @param segmentIds */ - boolean isFiltersSupported(FilterResolverIntf filterExp); + public void clear(List<String> segmentIds) { + for (String segmentId: segmentIds) { + dataMapFactory.clear(segmentId); + } + } /** - * Clears table level datamap + * Clears all datamap + */ + public void clear() { + dataMapFactory.clear(); + } + /** + * Get the unique name of datamap + * + * @return */ - void clear(); + public String getDataMapName() { + return dataMapName; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java new file mode 100644 index 0000000..8246f99 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow; +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; +import org.apache.carbondata.core.memory.MemoryAllocator; +import org.apache.carbondata.core.memory.MemoryAllocatorFactory; +import org.apache.carbondata.core.memory.MemoryBlock; + +import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; +import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe; + +/** + * Store the data map row @{@link DataMapRow} data to unsafe. + */ +public class UnsafeMemoryDMStore { + + private MemoryBlock memoryBlock; + + private static int capacity = 8 * 1024 * 1024; + + private int allocatedSize; + + private int runningLength; + + private MemoryAllocator memoryAllocator; + + private boolean isMemoryFreed; + + private DataMapSchema[] schema; + + private int[] pointers; + + private int rowCount; + + public UnsafeMemoryDMStore(DataMapSchema[] schema) { + this.schema = schema; + this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator(); + this.allocatedSize = capacity; + this.memoryBlock = memoryAllocator.allocate(allocatedSize); + this.pointers = new int[1000]; + } + + /** + * Check memory is sufficient or not, if not sufficient allocate more memory and copy old data to + * new one. + * + * @param rowSize + */ + private void ensureSize(int rowSize) { + if (runningLength + rowSize >= allocatedSize) { + MemoryBlock allocate = + MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity); + unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), + allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); + memoryAllocator.free(memoryBlock); + allocatedSize = allocatedSize + capacity; + memoryBlock = allocate; + } + if (this.pointers.length <= rowCount + 1) { + int[] newPointer = new int[pointers.length + 1000]; + System.arraycopy(pointers, 0, newPointer, 0, pointers.length); + this.pointers = newPointer; + } + } + + /** + * Add the index row to unsafe. + * + * @param indexRow + * @return + */ + public void addIndexRowToUnsafe(DataMapRow indexRow) { + // First calculate the required memory to keep the row in unsafe + int rowSize = indexRow.getTotalSizeInBytes(); + // Check whether allocated memory is sufficient or not. + ensureSize(rowSize); + int pointer = runningLength; + + for (int i = 0; i < schema.length; i++) { + addToUnsafe(schema[i], indexRow, i); + } + pointers[rowCount++] = pointer; + } + + private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) { + switch (schema.getSchemaType()) { + case FIXED: + switch (schema.getDataType()) { + case BYTE: + unsafe.putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, + row.getByte(index)); + runningLength += row.getSizeInBytes(index); + break; + case SHORT: + unsafe + .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, + row.getShort(index)); + runningLength += row.getSizeInBytes(index); + break; + case INT: + unsafe.putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, + row.getInt(index)); + runningLength += row.getSizeInBytes(index); + break; + case LONG: + unsafe.putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, + row.getLong(index)); + runningLength += row.getSizeInBytes(index); + break; + case FLOAT: + unsafe + .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, + row.getFloat(index)); + runningLength += row.getSizeInBytes(index); + break; + case DOUBLE: + unsafe + .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, + row.getDouble(index)); + runningLength += row.getSizeInBytes(index); + break; + case BYTE_ARRAY: + byte[] data = row.getByteArray(index); + unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + runningLength, data.length); + runningLength += row.getSizeInBytes(index); + break; + } + break; + case VARIABLE: + byte[] data = row.getByteArray(index); + unsafe.putShort(memoryBlock.getBaseOffset() + runningLength, (short) data.length); + runningLength += 2; + unsafe.copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + runningLength, data.length); + runningLength += data.length; + break; + case STRUCT: + DataMapSchema[] childSchemas = + ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas(); + DataMapRow struct = row.getRow(index); + for (int i = 0; i < childSchemas.length; i++) { + addToUnsafe(childSchemas[i], struct, i); + } + break; + } + } + + public DataMapRow getUnsafeRow(int index) { + assert (index < rowCount); + return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]); + } + + public void finishWriting() { + if (runningLength < allocatedSize) { + MemoryBlock allocate = + MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength); + unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), + allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); + memoryAllocator.free(memoryBlock); + memoryBlock = allocate; + } + // Compact pointers. + if (rowCount < pointers.length) { + int[] newPointer = new int[rowCount]; + System.arraycopy(pointers, 0, newPointer, 0, rowCount); + this.pointers = newPointer; + } + } + + public void freeMemory() { + if (!isMemoryFreed) { + memoryAllocator.free(memoryBlock); + isMemoryFreed = true; + } + } + + public int getMemoryUsed() { + return runningLength; + } + + public DataMapSchema[] getSchema() { + return schema; + } + + public int getRowCount() { + return rowCount; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java new file mode 100644 index 0000000..9a50600 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.util.ByteUtil; + +/** + * Data map comparator + */ +public class BlockletDMComparator implements Comparator<DataMapRow> { + + /** + * no dictionary column value is of variable length so in each column value + * it will -1 + */ + private static final int NO_DCITIONARY_COLUMN_VALUE = -1; + + /** + * sized of the short value in bytes + */ + private static final short SHORT_SIZE_IN_BYTES = 2; + + private int[] eachColumnValueSize; + + /** + * the number of no dictionary columns in SORT_COLUMNS + */ + private int numberOfNoDictSortColumns; + + /** + * the number of columns in SORT_COLUMNS + */ + private int numberOfSortColumns; + + public BlockletDMComparator(int[] eachColumnValueSize, int numberOfSortColumns, + int numberOfNoDictSortColumns) { + this.eachColumnValueSize = eachColumnValueSize; + this.numberOfNoDictSortColumns = numberOfNoDictSortColumns; + this.numberOfSortColumns = numberOfSortColumns; + } + + @Override public int compare(DataMapRow first, DataMapRow second) { + int dictionaryKeyOffset = 0; + int nonDictionaryKeyOffset = 0; + int compareResult = 0; + int processedNoDictionaryColumn = numberOfNoDictSortColumns; + byte[][] firstBytes = splitKey(first.getByteArray(0)); + byte[][] secondBytes = splitKey(first.getByteArray(0)); + byte[] firstNoDictionaryKeys = firstBytes[1]; + ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys); + byte[] secondNoDictionaryKeys = secondBytes[1]; + ByteBuffer secondNoDictionaryKeyBuffer = ByteBuffer.wrap(secondNoDictionaryKeys); + int actualOffset = 0; + int actualOffset1 = 0; + int firstNoDcitionaryLength = 0; + int secondNodeDictionaryLength = 0; + + for (int i = 0; i < numberOfSortColumns; i++) { + + if (eachColumnValueSize[i] != NO_DCITIONARY_COLUMN_VALUE) { + byte[] firstDictionaryKeys = firstBytes[0]; + byte[] secondDictionaryKeys = secondBytes[0]; + compareResult = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(firstDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i], + secondDictionaryKeys, dictionaryKeyOffset, eachColumnValueSize[i]); + dictionaryKeyOffset += eachColumnValueSize[i]; + } else { + if (processedNoDictionaryColumn > 1) { + actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset); + firstNoDcitionaryLength = + firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES) + - actualOffset; + actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset); + secondNodeDictionaryLength = + secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset + SHORT_SIZE_IN_BYTES) + - actualOffset1; + compareResult = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength, + secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength); + nonDictionaryKeyOffset += SHORT_SIZE_IN_BYTES; + processedNoDictionaryColumn--; + } else { + actualOffset = firstNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset); + actualOffset1 = secondNoDictionaryKeyBuffer.getShort(nonDictionaryKeyOffset); + firstNoDcitionaryLength = firstNoDictionaryKeys.length - actualOffset; + secondNodeDictionaryLength = secondNoDictionaryKeys.length - actualOffset1; + compareResult = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(firstNoDictionaryKeys, actualOffset, firstNoDcitionaryLength, + secondNoDictionaryKeys, actualOffset1, secondNodeDictionaryLength); + } + } + if (compareResult != 0) { + return compareResult; + } + } + + return 0; + } + + /** + * Split the index key to dictionary and no dictionary. + * @param startKey + * @return + */ + private byte[][] splitKey(byte[] startKey) { + ByteBuffer buffer = ByteBuffer.wrap(startKey); + buffer.rewind(); + int dictonaryKeySize = buffer.getInt(); + int nonDictonaryKeySize = buffer.getInt(); + byte[] dictionaryKey = new byte[dictonaryKeySize]; + buffer.get(dictionaryKey); + byte[] nonDictionaryKey = new byte[nonDictonaryKeySize]; + buffer.get(nonDictionaryKey); + return new byte[][] {dictionaryKey, nonDictionaryKey}; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java new file mode 100644 index 0000000..79aa091 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Comparator; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.datastore.IndexKey; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.indexstore.DataMap; +import org.apache.carbondata.core.indexstore.DataMapDistributable; +import org.apache.carbondata.core.indexstore.DataMapWriter; +import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; +import org.apache.carbondata.core.indexstore.schema.DataMapSchema; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; + +/** + * Datamap implementation for blocklet. + */ +public class BlockletDataMap implements DataMap, Cacheable { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletDataMap.class.getName()); + + private static int KEY_INDEX = 0; + + private static int MIN_VALUES_INDEX = 1; + + private static int MAX_VALUES_INDEX = 2; + + private static int ROW_COUNT_INDEX = 3; + + private static int FILE_PATH_INDEX = 4; + + private static int PAGE_COUNT_INDEX = 5; + + private static int VERSION_INDEX = 6; + + private static int SCHEMA_UPADATED_TIME_INDEX = 7; + + private static int BLOCK_INFO_INDEX = 8; + + private UnsafeMemoryDMStore unsafeMemoryDMStore; + + private SegmentProperties segmentProperties; + + private int[] columnCardinality; + + @Override public DataMapWriter getWriter() { + return null; + } + + @Override public void init(String path) { + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + try { + List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path); + for (DataFileFooter fileFooter : indexInfo) { + List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + createSchema(segmentProperties); + } + TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); + fileFooter = CarbonUtil.readMetadatFile(blockInfo); + + loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath()); + } + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.finishWriting(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties, + String filePath) { + int[] minMaxLen = segmentProperties.getEachDimColumnValueSize(); + List<BlockletInfo> blockletList = fileFooter.getBlockletList(); + DataMapSchema[] schema = unsafeMemoryDMStore.getSchema(); + for (int index = 0; index < blockletList.size(); index++) { + DataMapRow row = new DataMapRowImpl(schema); + int ordinal = 0; + BlockletInfo blockletInfo = blockletList.get(index); + + // add start key as index key + row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++); + + BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal); + ordinal++; + row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal); + ordinal++; + + row.setInt(blockletInfo.getNumberOfRows(), ordinal++); + + // add file path + byte[] filePathBytes = filePath.getBytes(); + row.setByteArray(filePathBytes, ordinal++); + + // add pages + row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++); + + // add version number + row.setShort(fileFooter.getVersionId().number(), ordinal++); + + // add schema updated time + row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); + + // add blocklet info + byte[] serializedData; + try { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + blockletInfo.write(dataOutput); + serializedData = stream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + row.setByteArray(serializedData, ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } + } + + private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) { + DataMapSchema[] minSchemas = + ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas(); + DataMapRow minRow = new DataMapRowImpl(minSchemas); + int minOrdinal = 0; + // min value adding + for (int i = 0; i < minMaxLen.length; i++) { + minRow.setByteArray(minValues[i], minOrdinal++); + } + return minRow; + } + + private void createSchema(SegmentProperties segmentProperties) { + List<DataMapSchema> indexSchemas = new ArrayList<>(); + + // Index key + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + int[] minMaxLen = segmentProperties.getEachDimColumnValueSize(); + // do it 2 times, one for min and one for max. + for (int k = 0; k < 2; k++) { + DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length]; + for (int i = 0; i < minMaxLen.length; i++) { + if (minMaxLen[i] <= 0) { + mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY); + } else { + mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataType.BYTE_ARRAY, minMaxLen[i]); + } + } + DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(DataType.STRUCT, mapSchemas); + indexSchemas.add(mapSchema); + } + + // for number of rows. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.INT)); + + // for table block path + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + + // for number of pages. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT)); + + // for version number. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.SHORT)); + + // for schema updated time. + indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataType.LONG)); + + //for blocklet info + indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY)); + + unsafeMemoryDMStore = + new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()])); + } + + @Override public List<Blocklet> prune(FilterResolverIntf filterExp) { + + // getting the start and end index key based on filter for hitting the + // selected block reference nodes based on filter resolver tree. + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("preparing the start and end key for finding" + + "start and end block as per filter resolver"); + } + List<Blocklet> blocklets = new ArrayList<>(); + Comparator<DataMapRow> comparator = + new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(), + segmentProperties.getNumberOfSortColumns(), + segmentProperties.getNumberOfNoDictSortColumns()); + List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2); + FilterUtil + .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys); + // reading the first value from list which has start key + IndexKey searchStartKey = listOfStartEndKeys.get(0); + // reading the last value from list which has end key + IndexKey searchEndKey = listOfStartEndKeys.get(1); + if (null == searchStartKey && null == searchEndKey) { + try { + // TODO need to handle for no dictionary dimensions + searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); + // TODO need to handle for no dictionary dimensions + searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); + } catch (KeyGenException e) { + return null; + } + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey + .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey + .getNoDictionaryKeys() + "Dictionary End Key: " + searchEndKey.getDictionaryKeys() + + "No Dictionary End Key " + searchEndKey.getNoDictionaryKeys()); + } + if (filterExp == null) { + int rowCount = unsafeMemoryDMStore.getRowCount(); + for (int i = 0; i < rowCount; i++) { + DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i); + blocklets.add(createBlocklet(unsafeRow, i)); + } + } else { + int startIndex = findStartIndex(convertToRow(searchStartKey), comparator); + int endIndex = findEndIndex(convertToRow(searchEndKey), comparator); + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + while (startIndex <= endIndex) { + DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex); + BitSet bitSet = filterExecuter.isScanRequired(getMinMaxValue(unsafeRow, MAX_VALUES_INDEX), + getMinMaxValue(unsafeRow, MIN_VALUES_INDEX)); + if (!bitSet.isEmpty()) { + blocklets.add(createBlocklet(unsafeRow, startIndex)); + } + startIndex++; + } + } + + return blocklets; + } + + private byte[][] getMinMaxValue(DataMapRow row, int index) { + DataMapRow minMaxRow = row.getRow(index); + byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; + for (int i = 0; i < minMax.length; i++) { + minMax[i] = minMaxRow.getByteArray(i); + } + return minMax; + } + + private Blocklet createBlocklet(DataMapRow row, int blockletId) { + Blocklet blocklet = + new Blocklet(new String(row.getByteArray(FILE_PATH_INDEX)), blockletId + ""); + BlockletDetailInfo detailInfo = new BlockletDetailInfo(); + detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX)); + detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX)); + detailInfo.setVersionNumber(row.getShort(VERSION_INDEX)); + detailInfo.setDimLens(columnCardinality); + detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX)); + BlockletInfo blockletInfo = new BlockletInfo(); + try { + byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX); + ByteArrayInputStream stream = new ByteArrayInputStream(byteArray); + DataInputStream inputStream = new DataInputStream(stream); + blockletInfo.readFields(inputStream); + inputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + detailInfo.setBlockletInfo(blockletInfo); + blocklet.setDetailInfo(detailInfo); + return blocklet; + } + + /** + * Binary search used to get the first tentative index row based on + * search key + * + * @param key search key + * @return first tentative block + */ + private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) { + int childNodeIndex; + int low = 0; + int high = unsafeMemoryDMStore.getRowCount() - 1; + int mid = 0; + int compareRes = -1; + // + while (low <= high) { + mid = (low + high) >>> 1; + // compare the entries + compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + if (compareRes < 0) { + high = mid - 1; + } else if (compareRes > 0) { + low = mid + 1; + } else { + // if key is matched then get the first entry + int currentPos = mid; + while (currentPos - 1 >= 0 + && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) { + currentPos--; + } + mid = currentPos; + break; + } + } + // if compare result is less than zero then we + // and mid is more than 0 then we need to previous block as duplicates + // record can be present + if (compareRes < 0) { + if (mid > 0) { + mid--; + } + childNodeIndex = mid; + } else { + childNodeIndex = mid; + } + // get the leaf child + return childNodeIndex; + } + + /** + * Binary search used to get the last tentative block based on + * search key + * + * @param key search key + * @return first tentative block + */ + private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) { + int childNodeIndex; + int low = 0; + int high = unsafeMemoryDMStore.getRowCount() - 1; + int mid = 0; + int compareRes = -1; + // + while (low <= high) { + mid = (low + high) >>> 1; + // compare the entries + compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + if (compareRes < 0) { + high = mid - 1; + } else if (compareRes > 0) { + low = mid + 1; + } else { + int currentPos = mid; + // if key is matched then get the first entry + while (currentPos + 1 < unsafeMemoryDMStore.getRowCount() + && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) { + currentPos++; + } + mid = currentPos; + break; + } + } + // if compare result is less than zero then we + // and mid is more than 0 then we need to previous block as duplicates + // record can be present + if (compareRes < 0) { + if (mid > 0) { + mid--; + } + childNodeIndex = mid; + } else { + childNodeIndex = mid; + } + return childNodeIndex; + } + + private DataMapRow convertToRow(IndexKey key) { + ByteBuffer buffer = + ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8); + buffer.putInt(key.getDictionaryKeys().length); + buffer.putInt(key.getNoDictionaryKeys().length); + buffer.put(key.getDictionaryKeys()); + buffer.put(key.getNoDictionaryKeys()); + DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema()); + dataMapRow.setByteArray(buffer.array(), 0); + return dataMapRow; + } + + @Override public void clear() { + unsafeMemoryDMStore.freeMemory(); + unsafeMemoryDMStore = null; + segmentProperties = null; + } + + @Override public long getFileTimeStamp() { + return 0; + } + + @Override public int getAccessCount() { + return 0; + } + + @Override public long getMemorySize() { + return unsafeMemoryDMStore.getMemoryUsed(); + } + + @Override public DataMapDistributable toDistributable() { + // TODO + return null; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java new file mode 100644 index 0000000..2fe6643 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.DataMap; +import org.apache.carbondata.core.indexstore.DataMapDistributable; +import org.apache.carbondata.core.indexstore.DataMapFactory; +import org.apache.carbondata.core.indexstore.DataMapWriter; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +/** + * Table map for blocklet + */ +public class BlockletDataMapFactory implements DataMapFactory { + + private AbsoluteTableIdentifier identifier; + + private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + + private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache; + + public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + cache = CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath()); + } + + public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) { + return null; + } + + public List<DataMap> getDataMaps(String segmentId) { + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + segmentMap.get(segmentId); + if (tableBlockIndexUniqueIdentifiers == null) { + tableBlockIndexUniqueIdentifiers = new ArrayList<>(); + String path = identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId; + FileFactory.FileType fileType = FileFactory.getFileType(path); + CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(".carbonindex"); + } + }); + for (int i = 0; i < listFiles.length; i++) { + tableBlockIndexUniqueIdentifiers.add( + new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName())); + } + } + + try { + return cache.getAll(tableBlockIndexUniqueIdentifiers); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override public boolean isFiltersSupported(FilterType filterType) { + return true; + } + + public void clear(String segmentId) { + List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId); + if (blockIndexes != null) { + for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { + DataMap dataMap = cache.getIfPresent(blockIndex); + dataMap.clear(); + cache.invalidate(blockIndex); + } + } + } + + @Override public void clear() { + for (String segmentId: segmentMap.keySet()) { + clear(segmentId); + } + } + + @Override public DataMap getDataMap(DataMapDistributable distributable) { + return null; + } + + @Override public void fireEvent(ChangeEvent event) { + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java new file mode 100644 index 0000000..5509c75 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory; +import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader; +import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; + +/** + * wrapper for blocklet data map data + */ +public class BlockletDataRefNodeWrapper implements DataRefNode { + + private List<TableBlockInfo> blockInfos; + + private int index; + + private int[] dimensionLens; + + private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache; + + public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index, + int[] dimensionLens) { + this.blockInfos = blockInfos; + this.index = index; + this.dimensionLens = dimensionLens; + } + + @Override public DataRefNode getNextDataRefNode() { + if (index + 1 < blockInfos.size()) { + new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens); + } + return null; + } + + @Override public int nodeSize() { + return blockInfos.get(index).getDetailInfo().getRowCount(); + } + + @Override public long nodeNumber() { + return index; + } + + @Override public byte[][] getColumnsMaxValue() { + return null; + } + + @Override public byte[][] getColumnsMinValue() { + return null; + } + + @Override + public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes) + throws IOException { + DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader(); + return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes); + } + + @Override + public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes) + throws IOException { + DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader(); + return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes); + } + + @Override + public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes) + throws IOException { + MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(); + return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes); + } + + @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) + throws IOException { + MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(); + return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex); + } + + private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException { + ColumnarFormatVersion version = + ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); + DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance() + .getDimensionColumnChunkReader(version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens, + blockInfos.get(index).getFilePath()); + return dimensionColumnChunkReader; + } + + private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException { + ColumnarFormatVersion version = + ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); + return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), + blockInfos.get(index).getFilePath()); + } + + @Override + public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) { + this.deleteDeltaDataCache = deleteDeltaDataCache; + } + + @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() { + return deleteDeltaDataCache; + } + + @Override public int numberOfPages() { + return blockInfos.get(index).getDetailInfo().getPagesCount(); + } + + public int numberOfNodes() { + return blockInfos.size(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d71d9c4/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java new file mode 100644 index 0000000..b8cffc6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datastore.block.AbstractIndex; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.util.CarbonUtil; + +/** + * Wrapper of abstract index + * TODO it could be removed after refactor + */ +public class IndexWrapper extends AbstractIndex { + + public IndexWrapper(List<TableBlockInfo> blockInfos) { + DataFileFooter fileFooter = null; + try { + fileFooter = CarbonUtil.readMetadatFile(blockInfos.get(0)); + } catch (IOException e) { + throw new RuntimeException(e); + } + segmentProperties = new SegmentProperties(fileFooter.getColumnInTable(), + fileFooter.getSegmentInfo().getColumnCardinality()); + dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0, + segmentProperties.getDimensionColumnsValueSize()); + } + + @Override public void buildIndex(List<DataFileFooter> footerList) { + } +}