http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java new file mode 100644 index 0000000..ef169af --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Comparator; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; +import org.apache.carbondata.core.datastore.IndexKey; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.BlockMetaInfo; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.DataTypeUtil; + +import org.apache.commons.lang3.StringUtils; +import org.xerial.snappy.Snappy; + +/** + * Datamap implementation for blocklet. + */ +public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap implements Cacheable { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName()); + + private static int KEY_INDEX = 0; + + private static int MIN_VALUES_INDEX = 1; + + private static int MAX_VALUES_INDEX = 2; + + private static int ROW_COUNT_INDEX = 3; + + private static int FILE_PATH_INDEX = 4; + + private static int PAGE_COUNT_INDEX = 5; + + private static int VERSION_INDEX = 6; + + private static int SCHEMA_UPADATED_TIME_INDEX = 7; + + private static int BLOCK_INFO_INDEX = 8; + + private static int BLOCK_FOOTER_OFFSET = 9; + + private static int LOCATIONS = 10; + + private static int BLOCKLET_ID_INDEX = 11; + + private static int BLOCK_LENGTH = 12; + + private static int TASK_MIN_VALUES_INDEX = 0; + + private static int TASK_MAX_VALUES_INDEX = 1; + + private static int SCHEMA = 2; + + private static int PARTITION_INFO = 3; + + private UnsafeMemoryDMStore unsafeMemoryDMStore; + + private UnsafeMemoryDMStore unsafeMemorySummaryDMStore; + + private SegmentProperties segmentProperties; + + private int[] columnCardinality; + + private boolean isPartitionedSegment; + + @Override + public void init(DataMapModel dataMapModel) throws IOException, MemoryException { + long startTime = System.currentTimeMillis(); + assert (dataMapModel instanceof BlockletDataMapModel); + BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel; + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List<DataFileFooter> indexInfo = fileFooterConverter + .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData()); + isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment(); + DataMapRowImpl summaryRow = null; + byte[] schemaBinary = null; + // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID + // is id assigned to a blocklet within a part file + String tempFilePath = null; + int relativeBlockletId = 0; + for (DataFileFooter fileFooter : indexInfo) { + if (segmentProperties == null) { + List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); + schemaBinary = convertSchemaToBinary(columnInTable); + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + createSchema(segmentProperties); + createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary); + } + TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); + BlockMetaInfo blockMetaInfo = + blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath()); + // Here it loads info about all blocklets of index + // Only add if the file exists physically. There are scenarios which index file exists inside + // merge index but related carbondata files are deleted. In that case we first check whether + // the file exists physically or not + if (blockMetaInfo != null) { + if (fileFooter.getBlockletList() == null) { + // This is old store scenario, here blocklet information is not available in index file so + // load only block info + summaryRow = + loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, + blockMetaInfo); + } else { + // blocklet ID will start from 0 again only when part file path is changed + if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) { + tempFilePath = blockInfo.getFilePath(); + relativeBlockletId = 0; + } + summaryRow = + loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, + blockMetaInfo, relativeBlockletId); + // this is done because relative blocklet id need to be incremented based on the + // total number of blocklets + relativeBlockletId += fileFooter.getBlockletList().size(); + } + } + } + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.finishWriting(); + } + if (null != unsafeMemorySummaryDMStore) { + addTaskSummaryRowToUnsafeMemoryStore( + summaryRow, + blockletDataMapInfo.getPartitions(), + schemaBinary); + unsafeMemorySummaryDMStore.finishWriting(); + } + LOGGER.info( + "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + ( + System.currentTimeMillis() - startTime)); + } + + private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter, + SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, + BlockMetaInfo blockMetaInfo, int relativeBlockletId) { + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + List<BlockletInfo> blockletList = fileFooter.getBlockletList(); + CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + // Add one row to maintain task level min max for segment pruning + if (!blockletList.isEmpty() && summaryRow == null) { + summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); + } + for (int index = 0; index < blockletList.size(); index++) { + DataMapRow row = new DataMapRowImpl(schema); + int ordinal = 0; + int taskMinMaxOrdinal = 0; + BlockletInfo blockletInfo = blockletList.get(index); + + // add start key as index key + row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++); + + BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); + byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); + // compute and set task level min values + addTaskMinMaxValues(summaryRow, minMaxLen, + unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, + TASK_MIN_VALUES_INDEX, true); + ordinal++; + taskMinMaxOrdinal++; + byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); + row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); + // compute and set task level max values + addTaskMinMaxValues(summaryRow, minMaxLen, + unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, + TASK_MAX_VALUES_INDEX, false); + ordinal++; + + row.setInt(blockletInfo.getNumberOfRows(), ordinal++); + + // add file path + byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + row.setByteArray(filePathBytes, ordinal++); + + // add pages + row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++); + + // add version number + row.setShort(fileFooter.getVersionId().number(), ordinal++); + + // add schema updated time + row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); + + // add blocklet info + byte[] serializedData; + try { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + blockletInfo.write(dataOutput); + serializedData = stream.toByteArray(); + row.setByteArray(serializedData, ordinal++); + // Add block footer offset, it is used if we need to read footer of block + row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); + setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); + ordinal++; + // for relative blockelt id i.e blocklet id that belongs to a particular part file + row.setShort((short) relativeBlockletId++, ordinal++); + // Store block size + row.setLong(blockMetaInfo.getSize(), ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return summaryRow; + } + + private void setLocations(String[] locations, DataMapRow row, int ordinal) + throws UnsupportedEncodingException { + // Add location info + String locationStr = StringUtils.join(locations, ','); + row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal); + } + + /** + * Load information for the block.It is the case can happen only for old stores + * where blocklet information is not available in index file. So load only block information + * and read blocklet information in executor. + */ + private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter, + SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, + BlockMetaInfo blockMetaInfo) { + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + BlockletIndex blockletIndex = fileFooter.getBlockletIndex(); + CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + // Add one row to maintain task level min max for segment pruning + if (summaryRow == null) { + summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema()); + } + DataMapRow row = new DataMapRowImpl(schema); + int ordinal = 0; + int taskMinMaxOrdinal = 0; + // add start key as index key + row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++); + + BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex(); + byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); + // compute and set task level min values + addTaskMinMaxValues(summaryRow, minMaxLen, + unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, + TASK_MIN_VALUES_INDEX, true); + ordinal++; + taskMinMaxOrdinal++; + byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); + row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); + // compute and set task level max values + addTaskMinMaxValues(summaryRow, minMaxLen, + unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, + TASK_MAX_VALUES_INDEX, false); + ordinal++; + + row.setInt((int)fileFooter.getNumberOfRows(), ordinal++); + + // add file path + byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + row.setByteArray(filePathBytes, ordinal++); + + // add pages + row.setShort((short) 0, ordinal++); + + // add version number + row.setShort(fileFooter.getVersionId().number(), ordinal++); + + // add schema updated time + row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++); + + // add blocklet info + row.setByteArray(new byte[0], ordinal++); + + row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); + try { + setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); + ordinal++; + // for relative blocklet id. Value is -1 because in case of old store blocklet info will + // not be present in the index file and in that case we will not knwo the total number of + // blocklets + row.setShort((short) -1, ordinal++); + + // store block size + row.setLong(blockMetaInfo.getSize(), ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return summaryRow; + } + + private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, + List<String> partitions, byte[] schemaBinary) throws IOException { + // write the task summary info to unsafe memory store + if (null != summaryRow) { + // Add column schema , it is useful to generate segment properties in executor. + // So we no need to read footer again there. + if (schemaBinary != null) { + summaryRow.setByteArray(schemaBinary, SCHEMA); + } + if (partitions != null && partitions.size() > 0) { + CarbonRowSchema[] minSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore + .getSchema()[PARTITION_INFO]).getChildSchemas(); + DataMapRow partitionRow = new DataMapRowImpl(minSchemas); + for (int i = 0; i < partitions.size(); i++) { + partitionRow + .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS), + i); + } + summaryRow.setRow(partitionRow, PARTITION_INFO); + } + try { + unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Fill the measures min values with minimum , this is needed for backward version compatability + * as older versions don't store min values for measures + */ + private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) { + byte[][] updatedValues = minValues; + if (minValues.length < minMaxLen.length) { + updatedValues = new byte[minMaxLen.length][]; + System.arraycopy(minValues, 0, updatedValues, 0, minValues.length); + List<CarbonMeasure> measures = segmentProperties.getMeasures(); + ByteBuffer buffer = ByteBuffer.allocate(8); + for (int i = 0; i < measures.size(); i++) { + buffer.rewind(); + DataType dataType = measures.get(i).getDataType(); + if (dataType == DataTypes.BYTE) { + buffer.putLong(Byte.MIN_VALUE); + updatedValues[minValues.length + i] = buffer.array().clone(); + } else if (dataType == DataTypes.SHORT) { + buffer.putLong(Short.MIN_VALUE); + updatedValues[minValues.length + i] = buffer.array().clone(); + } else if (dataType == DataTypes.INT) { + buffer.putLong(Integer.MIN_VALUE); + updatedValues[minValues.length + i] = buffer.array().clone(); + } else if (dataType == DataTypes.LONG) { + buffer.putLong(Long.MIN_VALUE); + updatedValues[minValues.length + i] = buffer.array().clone(); + } else if (DataTypes.isDecimal(dataType)) { + updatedValues[minValues.length + i] = + DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE)); + } else { + buffer.putDouble(Double.MIN_VALUE); + updatedValues[minValues.length + i] = buffer.array().clone(); + } + } + } + return updatedValues; + } + + /** + * Fill the measures max values with maximum , this is needed for backward version compatability + * as older versions don't store max values for measures + */ + private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) { + byte[][] updatedValues = maxValues; + if (maxValues.length < minMaxLen.length) { + updatedValues = new byte[minMaxLen.length][]; + System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length); + List<CarbonMeasure> measures = segmentProperties.getMeasures(); + ByteBuffer buffer = ByteBuffer.allocate(8); + for (int i = 0; i < measures.size(); i++) { + buffer.rewind(); + DataType dataType = measures.get(i).getDataType(); + if (dataType == DataTypes.BYTE) { + buffer.putLong(Byte.MAX_VALUE); + updatedValues[maxValues.length + i] = buffer.array().clone(); + } else if (dataType == DataTypes.SHORT) { + buffer.putLong(Short.MAX_VALUE); + updatedValues[maxValues.length + i] = buffer.array().clone(); + } else if (dataType == DataTypes.INT) { + buffer.putLong(Integer.MAX_VALUE); + updatedValues[maxValues.length + i] = buffer.array().clone(); + } else if (dataType == DataTypes.LONG) { + buffer.putLong(Long.MAX_VALUE); + updatedValues[maxValues.length + i] = buffer.array().clone(); + } else if (DataTypes.isDecimal(dataType)) { + updatedValues[maxValues.length + i] = + DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE)); + } else { + buffer.putDouble(Double.MAX_VALUE); + updatedValues[maxValues.length + i] = buffer.array().clone(); + } + } + } + return updatedValues; + } + + private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema, + byte[][] minValues) { + CarbonRowSchema[] minSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); + DataMapRow minRow = new DataMapRowImpl(minSchemas); + int minOrdinal = 0; + // min value adding + for (int i = 0; i < minMaxLen.length; i++) { + minRow.setByteArray(minValues[i], minOrdinal++); + } + return minRow; + } + + /** + * This method will compute min/max values at task level + * + * @param taskMinMaxRow + * @param minMaxLen + * @param carbonRowSchema + * @param minMaxValue + * @param ordinal + * @param isMinValueComparison + */ + private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen, + CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal, + boolean isMinValueComparison) { + DataMapRow row = taskMinMaxRow.getRow(ordinal); + byte[][] updatedMinMaxValues = minMaxValue; + if (null == row) { + CarbonRowSchema[] minSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); + row = new DataMapRowImpl(minSchemas); + } else { + byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal); + // Compare and update min max values + for (int i = 0; i < minMaxLen.length; i++) { + int compare = + ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]); + if (isMinValueComparison) { + if (compare < 0) { + updatedMinMaxValues[i] = existingMinMaxValues[i]; + } + } else if (compare > 0) { + updatedMinMaxValues[i] = existingMinMaxValues[i]; + } + } + } + int minMaxOrdinal = 0; + // min/max value adding + for (int i = 0; i < minMaxLen.length; i++) { + row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++); + } + taskMinMaxRow.setRow(row, ordinal); + } + + private void createSchema(SegmentProperties segmentProperties) throws MemoryException { + List<CarbonRowSchema> indexSchemas = new ArrayList<>(); + + // Index key + indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); + getMinMaxSchema(segmentProperties, indexSchemas); + + // for number of rows. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT)); + + // for table block path + indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); + + // for number of pages. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); + + // for version number. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); + + // for schema updated time. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); + + //for blocklet info + indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); + + // for block footer offset. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); + + // for locations + indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); + + // for relative blocklet id i.e. blocklet id that belongs to a particular part file. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); + + // for storing block length. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); + + unsafeMemoryDMStore = + new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); + } + + /** + * Creates the schema to store summary information or the information which can be stored only + * once per datamap. It stores datamap level max/min of each column and partition information of + * datamap + * @param segmentProperties + * @param partitions + * @throws MemoryException + */ + private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions, + byte[] schemaBinary) + throws MemoryException { + List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(); + getMinMaxSchema(segmentProperties, taskMinMaxSchemas); + // for storing column schema + taskMinMaxSchemas.add( + new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length)); + if (partitions != null && partitions.size() > 0) { + CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()]; + for (int i = 0; i < mapSchemas.length; i++) { + mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); + } + CarbonRowSchema mapSchema = + new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), + mapSchemas); + taskMinMaxSchemas.add(mapSchema); + } + unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore( + taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()])); + } + + private void getMinMaxSchema(SegmentProperties segmentProperties, + List<CarbonRowSchema> minMaxSchemas) { + // Index key + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + // do it 2 times, one for min and one for max. + for (int k = 0; k < 2; k++) { + CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length]; + for (int i = 0; i < minMaxLen.length; i++) { + if (minMaxLen[i] <= 0) { + mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); + } else { + mapSchemas[i] = + new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]); + } + } + CarbonRowSchema mapSchema = + new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), + mapSchemas); + minMaxSchemas.add(mapSchema); + } + } + + @Override + public boolean isScanRequired(FilterResolverIntf filterExp) { + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) { + DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i); + boolean isScanRequired = FilterExpressionProcessor.isScanRequired( + filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX), + getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX)); + if (isScanRequired) { + return true; + } + } + return false; + } + + private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) { + if (unsafeMemoryDMStore.getRowCount() == 0) { + return new ArrayList<>(); + } + // getting the start and end index key based on filter for hitting the + // selected block reference nodes based on filter resolver tree. + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("preparing the start and end key for finding" + + "start and end block as per filter resolver"); + } + List<Blocklet> blocklets = new ArrayList<>(); + Comparator<DataMapRow> comparator = + new BlockletDMComparator(segmentProperties.getColumnsValueSize(), + segmentProperties.getNumberOfSortColumns(), + segmentProperties.getNumberOfNoDictSortColumns()); + List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2); + FilterUtil + .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys); + // reading the first value from list which has start key + IndexKey searchStartKey = listOfStartEndKeys.get(0); + // reading the last value from list which has end key + IndexKey searchEndKey = listOfStartEndKeys.get(1); + if (null == searchStartKey && null == searchEndKey) { + try { + // TODO need to handle for no dictionary dimensions + searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); + // TODO need to handle for no dictionary dimensions + searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); + } catch (KeyGenException e) { + return null; + } + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Successfully retrieved the start and end key" + "Dictionary Start Key: " + Arrays + .toString(searchStartKey.getDictionaryKeys()) + "No Dictionary Start Key " + Arrays + .toString(searchStartKey.getNoDictionaryKeys()) + "Dictionary End Key: " + Arrays + .toString(searchEndKey.getDictionaryKeys()) + "No Dictionary End Key " + Arrays + .toString(searchEndKey.getNoDictionaryKeys())); + } + if (filterExp == null) { + int rowCount = unsafeMemoryDMStore.getRowCount(); + for (int i = 0; i < rowCount; i++) { + DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow(); + blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX))); + } + } else { + int startIndex = findStartIndex(convertToRow(searchStartKey), comparator); + int endIndex = findEndIndex(convertToRow(searchEndKey), comparator); + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + while (startIndex <= endIndex) { + DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow(); + int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX); + String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX), + CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + boolean isValid = + addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX), + getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, blockletId); + if (isValid) { + blocklets.add(createBlocklet(safeRow, blockletId)); + } + startIndex++; + } + } + return blocklets; + } + + @Override + public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List<String> partitions) { + if (unsafeMemoryDMStore.getRowCount() == 0) { + return new ArrayList<>(); + } + // First get the partitions which are stored inside datamap. + List<String> storedPartitions = getPartitions(); + // if it has partitioned datamap but there is no partitioned information stored, it means + // partitions are dropped so return empty list. + if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) { + return new ArrayList<>(); + } + if (storedPartitions != null && storedPartitions.size() > 0) { + // Check the exact match of partition information inside the stored partitions. + boolean found = false; + if (partitions != null && partitions.size() > 0) { + found = partitions.containsAll(storedPartitions); + } + if (!found) { + return new ArrayList<>(); + } + } + // Prune with filters if the partitions are existed in this datamap + return prune(filterExp, segmentProperties); + } + + /** + * select the blocks based on column min and max value + * + * @param filterExecuter + * @param maxValue + * @param minValue + * @param filePath + * @param blockletId + * @return + */ + private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, byte[][] maxValue, + byte[][] minValue, String filePath, int blockletId) { + BitSet bitSet = null; + if (filterExecuter instanceof ImplicitColumnFilterExecutor) { + String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1); + // this case will come in case of old store where index file does not contain the + // blocklet information + if (blockletId != -1) { + uniqueBlockPath = uniqueBlockPath + CarbonCommonConstants.FILE_SEPARATOR + blockletId; + } + bitSet = ((ImplicitColumnFilterExecutor) filterExecuter) + .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath); + } else { + bitSet = filterExecuter.isScanRequired(maxValue, minValue); + } + if (!bitSet.isEmpty()) { + return true; + } else { + return false; + } + } + + public ExtendedBlocklet getDetailedBlocklet(String blockletId) { + int index = Integer.parseInt(blockletId); + DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow(); + return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)); + } + + private byte[][] getMinMaxValue(DataMapRow row, int index) { + DataMapRow minMaxRow = row.getRow(index); + byte[][] minMax = new byte[minMaxRow.getColumnCount()][]; + for (int i = 0; i < minMax.length; i++) { + minMax[i] = minMaxRow.getByteArray(i); + } + return minMax; + } + + private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) { + ExtendedBlocklet blocklet = new ExtendedBlocklet( + new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS), + blockletId + ""); + BlockletDetailInfo detailInfo = new BlockletDetailInfo(); + detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX)); + detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX)); + detailInfo.setVersionNumber(row.getShort(VERSION_INDEX)); + detailInfo.setBlockletId((short) blockletId); + detailInfo.setDimLens(columnCardinality); + detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX)); + byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX); + BlockletInfo blockletInfo = null; + try { + if (byteArray.length > 0) { + blockletInfo = new BlockletInfo(); + ByteArrayInputStream stream = new ByteArrayInputStream(byteArray); + DataInputStream inputStream = new DataInputStream(stream); + blockletInfo.readFields(inputStream); + inputStream.close(); + } + blocklet.setLocation( + new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET) + .split(",")); + } catch (IOException e) { + throw new RuntimeException(e); + } + detailInfo.setBlockletInfo(blockletInfo); + blocklet.setDetailInfo(detailInfo); + detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET)); + detailInfo.setColumnSchemaBinary(getColumnSchemaBinary()); + detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH)); + return blocklet; + } + + /** + * Binary search used to get the first tentative index row based on + * search key + * + * @param key search key + * @return first tentative block + */ + private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) { + int childNodeIndex; + int low = 0; + int high = unsafeMemoryDMStore.getRowCount() - 1; + int mid = 0; + int compareRes = -1; + // + while (low <= high) { + mid = (low + high) >>> 1; + // compare the entries + compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + if (compareRes < 0) { + high = mid - 1; + } else if (compareRes > 0) { + low = mid + 1; + } else { + // if key is matched then get the first entry + int currentPos = mid; + while (currentPos - 1 >= 0 + && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) { + currentPos--; + } + mid = currentPos; + break; + } + } + // if compare result is less than zero then we + // and mid is more than 0 then we need to previous block as duplicates + // record can be present + if (compareRes < 0) { + if (mid > 0) { + mid--; + } + childNodeIndex = mid; + } else { + childNodeIndex = mid; + } + // get the leaf child + return childNodeIndex; + } + + /** + * Binary search used to get the last tentative block based on + * search key + * + * @param key search key + * @return first tentative block + */ + private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) { + int childNodeIndex; + int low = 0; + int high = unsafeMemoryDMStore.getRowCount() - 1; + int mid = 0; + int compareRes = -1; + // + while (low <= high) { + mid = (low + high) >>> 1; + // compare the entries + compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid)); + if (compareRes < 0) { + high = mid - 1; + } else if (compareRes > 0) { + low = mid + 1; + } else { + int currentPos = mid; + // if key is matched then get the first entry + while (currentPos + 1 < unsafeMemoryDMStore.getRowCount() + && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) { + currentPos++; + } + mid = currentPos; + break; + } + } + // if compare result is less than zero then we + // and mid is more than 0 then we need to previous block as duplicates + // record can be present + if (compareRes < 0) { + if (mid > 0) { + mid--; + } + childNodeIndex = mid; + } else { + childNodeIndex = mid; + } + return childNodeIndex; + } + + private DataMapRow convertToRow(IndexKey key) { + ByteBuffer buffer = + ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8); + buffer.putInt(key.getDictionaryKeys().length); + buffer.putInt(key.getNoDictionaryKeys().length); + buffer.put(key.getDictionaryKeys()); + buffer.put(key.getNoDictionaryKeys()); + DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema()); + dataMapRow.setByteArray(buffer.array(), 0); + return dataMapRow; + } + + private List<String> getPartitions() { + DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + if (unsafeRow.getColumnCount() > PARTITION_INFO) { + List<String> partitions = new ArrayList<>(); + DataMapRow row = unsafeRow.getRow(PARTITION_INFO); + for (int i = 0; i < row.getColumnCount(); i++) { + partitions.add( + new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); + } + return partitions; + } + return null; + } + + private byte[] getColumnSchemaBinary() { + DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); + return unsafeRow.getByteArray(SCHEMA); + } + + /** + * Convert schema to binary + */ + private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(stream); + dataOutput.writeShort(columnSchemas.size()); + for (ColumnSchema columnSchema : columnSchemas) { + if (columnSchema.getColumnReferenceId() == null) { + columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId()); + } + columnSchema.write(dataOutput); + } + byte[] byteArray = stream.toByteArray(); + // Compress with snappy to reduce the size of schema + return Snappy.rawCompress(byteArray, byteArray.length); + } + + @Override + public void clear() { + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.freeMemory(); + unsafeMemoryDMStore = null; + segmentProperties = null; + } + // clear task min/max unsafe memory + if (null != unsafeMemorySummaryDMStore) { + unsafeMemorySummaryDMStore.freeMemory(); + unsafeMemorySummaryDMStore = null; + } + } + + @Override + public long getFileTimeStamp() { + return 0; + } + + @Override + public int getAccessCount() { + return 0; + } + + @Override + public long getMemorySize() { + long memoryUsed = 0L; + if (unsafeMemoryDMStore != null) { + memoryUsed += unsafeMemoryDMStore.getMemoryUsed(); + } + if (null != unsafeMemorySummaryDMStore) { + memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed(); + } + return memoryUsed; + } + + public SegmentProperties getSegmentProperties() { + return segmentProperties; + } + +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java new file mode 100644 index 0000000..a2c65ba --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMapFactory.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; +import org.apache.carbondata.core.datamap.dev.IndexDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.events.Event; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +/** + * Table map for blocklet + */ +public class BlockletIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory + implements BlockletDetailsFetcher, + SegmentPropertiesFetcher { + + private static final String NAME = "clustered.btree.blocklet"; + + public static final DataMapSchema DATA_MAP_SCHEMA = + new DataMapSchema(NAME, BlockletIndexDataMapFactory.class.getName()); + + private AbsoluteTableIdentifier identifier; + + // segmentId -> list of index file + private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + + private Cache<TableBlockIndexUniqueIdentifier, AbstractCoarseGrainIndexDataMap> cache; + + @Override + public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) { + this.identifier = identifier; + cache = CacheProvider.getInstance() + .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP); + } + + @Override + public AbstractDataMapWriter createWriter(String segmentId, String dataWriterPath) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId) throws IOException { + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + getTableBlockIndexUniqueIdentifiers(segmentId); + return cache.getAll(tableBlockIndexUniqueIdentifiers); + } + + private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers( + String segmentId) throws IOException { + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + segmentMap.get(segmentId); + if (tableBlockIndexUniqueIdentifiers == null) { + tableBlockIndexUniqueIdentifiers = new ArrayList<>(); + String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); + List<String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); + for (int i = 0; i < indexFiles.size(); i++) { + tableBlockIndexUniqueIdentifiers.add( + new TableBlockIndexUniqueIdentifier(identifier, segmentId, indexFiles.get(i))); + } + segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers); + } + return tableBlockIndexUniqueIdentifiers; + } + + /** + * Get the blocklet detail information based on blockletid, blockid and segmentid. This method is + * exclusively for BlockletIndexDataMapFactory as detail information is only available in this + * default datamap. + */ + @Override + public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId) + throws IOException { + List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(); + // If it is already detailed blocklet then type cast and return same + if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) { + for (Blocklet blocklet : blocklets) { + detailedBlocklets.add((ExtendedBlocklet) blocklet); + } + return detailedBlocklets; + } + List<TableBlockIndexUniqueIdentifier> identifiers = + getTableBlockIndexUniqueIdentifiers(segmentId); + // Retrieve each blocklets detail information from blocklet datamap + for (Blocklet blocklet : blocklets) { + detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet)); + } + return detailedBlocklets; + } + + @Override + public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) + throws IOException { + if (blocklet instanceof ExtendedBlocklet) { + return (ExtendedBlocklet) blocklet; + } + List<TableBlockIndexUniqueIdentifier> identifiers = + getTableBlockIndexUniqueIdentifiers(segmentId); + return getExtendedBlocklet(identifiers, blocklet); + } + + private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers, + Blocklet blocklet) throws IOException { + String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId()); + for (TableBlockIndexUniqueIdentifier identifier : identifiers) { + if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) { + IndexDataMap indexDataMap = cache.get(identifier); + return ((BlockletIndexDataMap) indexDataMap).getDetailedBlocklet(blocklet.getBlockletId()); + } + } + throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found "); + } + + + + @Override + public List<DataMapDistributable> toDistributable(String segmentId) { + CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId); + List<DataMapDistributable> distributables = new ArrayList<>(); + for (int i = 0; i < carbonIndexFiles.length; i++) { + Path path = new Path(carbonIndexFiles[i].getPath()); + try { + FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); + LocatedFileStatus fileStatus = iter.next(); + String[] location = fileStatus.getBlockLocations()[0].getHosts(); + BlockletDataMapDistributable distributable = + new BlockletDataMapDistributable(path.getName()); + distributable.setLocations(location); + distributables.add(distributable); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return distributables; + } + + @Override public void fireEvent(Event event) { + + } + + @Override + public void clear(String segmentId) { + List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId); + if (blockIndexes != null) { + for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { + IndexDataMap indexDataMap = cache.getIfPresent(blockIndex); + if (indexDataMap != null) { + cache.invalidate(blockIndex); + indexDataMap.clear(); + } + } + } + } + + @Override + public void clear() { + for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) { + clear(segmentId); + } + } + + @Override + public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable) + throws IOException { + BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable; + List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>(); + if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), + mapDistributable.getFilePath())); + } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); + List<String> indexFiles = fileStore.getIndexFilesFromMergeFile( + CarbonTablePath.getSegmentPath(identifier.getTablePath(), mapDistributable.getSegmentId()) + + "/" + mapDistributable.getFilePath()); + for (String indexFile : indexFiles) { + identifiers.add( + new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(), + indexFile)); + } + } + List<AbstractCoarseGrainIndexDataMap> dataMaps; + try { + dataMaps = cache.getAll(identifiers); + } catch (IOException e) { + throw new RuntimeException(e); + } + return dataMaps; + } + + @Override + public DataMapMeta getMeta() { + // TODO: pass SORT_COLUMNS into this class + return null; + } + + @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException { + List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId); + assert (dataMaps.size() > 0); + AbstractCoarseGrainIndexDataMap coarseGrainDataMap = dataMaps.get(0); + assert (coarseGrainDataMap instanceof BlockletIndexDataMap); + BlockletIndexDataMap dataMap = (BlockletIndexDataMap) coarseGrainDataMap; + return dataMap.getSegmentProperties(); + } + + @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions) + throws IOException { + List<Blocklet> blocklets = new ArrayList<>(); + List<AbstractCoarseGrainIndexDataMap> dataMaps = getDataMaps(segmentId); + for (AbstractCoarseGrainIndexDataMap dataMap : dataMaps) { + blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions)); + } + return blocklets; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 5b919d0..2393c54 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -768,7 +768,15 @@ public class CarbonTable implements Serializable { } /** - * whether this table has aggregation DataMap or not + * Return true if 'autoRefreshDataMap' is enabled, by default it is enabled + */ + public boolean isAutoRefreshDataMap() { + String refresh = getTableInfo().getFactTable().getTableProperties().get("autoRefreshDataMap"); + return refresh == null || refresh.equalsIgnoreCase("true"); + } + + /** + * whether this table has aggregation IndexDataMap or not */ public boolean hasAggregationDataMap() { List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java index 5a9017b..ae49467 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java @@ -32,6 +32,7 @@ public class DataMapSchema implements Serializable, Writable { protected String dataMapName; + // this name can be class name of the DataMapProvider implementation or short name of it private String className; protected RelationIdentifier relationIdentifier; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java index 5729959..1c6ebad 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java @@ -16,7 +16,7 @@ */ package org.apache.carbondata.core.metadata.schema.table; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA; +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider; public class DataMapSchemaFactory { public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory(); @@ -24,15 +24,16 @@ public class DataMapSchemaFactory { /** * Below class will be used to get data map schema object * based on class name - * @param className + * @param providerName * @return data map schema */ - public DataMapSchema getDataMapSchema(String dataMapName, String className) { - switch (className) { - case AGGREGATIONDATAMAPSCHEMA: - return new AggregationDataMapSchema(dataMapName, className); - default: - return new DataMapSchema(dataMapName, className); + public DataMapSchema getDataMapSchema(String dataMapName, String providerName) { + if (providerName.equalsIgnoreCase(DataMapProvider.PREAGGREGATE.toString())) { + return new AggregationDataMapSchema(dataMapName, providerName); + } else if (providerName.equalsIgnoreCase(DataMapProvider.TIMESERIES.toString())) { + return new AggregationDataMapSchema(dataMapName, providerName); + } else { + return new DataMapSchema(dataMapName, providerName); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java index fff1a74..5d79abc 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java @@ -285,8 +285,7 @@ public class TableSchema implements Serializable, Writable { // only = is allowed as special character , so replace with & CarbonCommonConstants.DEFAULT_CHARSET)).replace("=","&")); properties.put("QUERYTYPE", queryType); - DataMapSchema dataMapSchema = - new DataMapSchema(dataMapName, className); + DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className); dataMapSchema.setProperties(properties); dataMapSchema.setChildSchema(this); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java deleted file mode 100644 index 88ac3ed..0000000 --- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.apache.carbondata.core.indexstore.blockletindex; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; -import java.util.UUID; - -import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl; -import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; -import org.apache.carbondata.core.util.ByteUtil; - -import mockit.Mock; -import mockit.MockUp; -import org.junit.Before; -import org.junit.Test; - -public class TestBlockletDataMap extends AbstractDictionaryCacheTest { - - ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor; - @Before public void setUp() throws Exception { - CarbonImplicitDimension carbonImplicitDimension = - new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID); - DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo(); - dimColumnEvaluatorInfo.setColumnIndex(0); - dimColumnEvaluatorInfo.setRowIndex(0); - dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension); - dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false); - implicitIncludeFilterExecutor = - new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo); - } - - @Test public void testaddBlockBasedOnMinMaxValue() throws Exception { - - new MockUp<ImplicitIncludeFilterExecutorImpl>() { - @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue, - String uniqueBlockPath) { - BitSet bitSet = new BitSet(1); - bitSet.set(8); - return bitSet; - } - }; - - BlockletDataMap blockletDataMap = new BlockletDataMap(); - Method method = BlockletDataMap.class - .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class, - byte[][].class, String.class, int.class); - method.setAccessible(true); - - byte[][] minValue = { ByteUtil.toBytes("sfds") }; - byte[][] maxValue = { ByteUtil.toBytes("resa") }; - Object result = method - .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue, - "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata", - 0); - assert ((boolean) result); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java new file mode 100644 index 0000000..16048db --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndexDataMap.java @@ -0,0 +1,59 @@ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.lang.reflect.Method; +import java.util.BitSet; + +import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl; +import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; +import org.apache.carbondata.core.util.ByteUtil; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.Before; +import org.junit.Test; + +public class TestBlockletIndexDataMap extends AbstractDictionaryCacheTest { + + ImplicitIncludeFilterExecutorImpl implicitIncludeFilterExecutor; + @Before public void setUp() throws Exception { + CarbonImplicitDimension carbonImplicitDimension = + new CarbonImplicitDimension(0, CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_POSITIONID); + DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = new DimColumnResolvedFilterInfo(); + dimColumnEvaluatorInfo.setColumnIndex(0); + dimColumnEvaluatorInfo.setRowIndex(0); + dimColumnEvaluatorInfo.setDimension(carbonImplicitDimension); + dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false); + implicitIncludeFilterExecutor = + new ImplicitIncludeFilterExecutorImpl(dimColumnEvaluatorInfo); + } + + @Test public void testaddBlockBasedOnMinMaxValue() throws Exception { + + new MockUp<ImplicitIncludeFilterExecutorImpl>() { + @Mock BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue, + String uniqueBlockPath) { + BitSet bitSet = new BitSet(1); + bitSet.set(8); + return bitSet; + } + }; + + BlockletIndexDataMap blockletDataMap = new BlockletIndexDataMap(); + Method method = BlockletIndexDataMap.class + .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class, + byte[][].class, String.class, int.class); + method.setAccessible(true); + + byte[][] minValue = { ByteUtil.toBytes("sfds") }; + byte[][] maxValue = { ByteUtil.toBytes("resa") }; + Object result = method + .invoke(blockletDataMap, implicitIncludeFilterExecutor, minValue, maxValue, + "/opt/store/default/carbon_table/Fact/Part0/Segment_0/part-0-0_batchno0-0-1514989110586.carbondata", + 0); + assert ((boolean) result); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java deleted file mode 100644 index 8002e57..0000000 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.examples; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.fileoperations.AtomicFileOperations; -import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.core.indexstore.Blocklet; -import org.apache.carbondata.core.indexstore.row.DataMapRow; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; -import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.util.CarbonUtil; - -import com.google.gson.Gson; - -/** - * Datamap implementation for min max blocklet. - */ -public class MinMaxDataMap extends AbstractCoarseGrainDataMap { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(MinMaxDataMap.class.getName()); - - public static final String NAME = "clustered.minmax.btree.blocklet"; - - private String filePath; - - private MinMaxIndexBlockDetails[] readMinMaxDataMap; - - @Override - public void init(DataMapModel model) throws MemoryException, IOException { - this.filePath = model.getFilePath(); - CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0"); - for (int i = 0; i < listFiles.length; i++) { - readMinMaxDataMap = readJson(listFiles[i].getPath()); - } - } - - private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) { - String path = filePath.substring(0, filePath.lastIndexOf("/") + 1); - CarbonFile carbonFile = FileFactory.getCarbonFile(path); - return carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - return file.getName().endsWith(".minmaxindex"); - } - }); - } - - private MinMaxIndexBlockDetails[] readJson(String filePath) { - Gson gsonObjectToRead = new Gson(); - DataInputStream dataInputStream = null; - BufferedReader buffReader = null; - InputStreamReader inStream = null; - MinMaxIndexBlockDetails[] readMinMax = null; - AtomicFileOperations fileOperation = - new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath)); - - try { - if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) { - return null; - } - dataInputStream = fileOperation.openForRead(); - inStream = new InputStreamReader(dataInputStream, "UTF-8"); - buffReader = new BufferedReader(inStream); - readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class); - } catch (IOException e) { - return null; - } finally { - CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); - } - return readMinMax; - } - - /** - * Block Prunning logic for Min Max DataMap. - * - * @param filterExp - * @param segmentProperties - * @return - */ - @Override - public List<Blocklet> prune(FilterResolverIntf filterExp, - SegmentProperties segmentProperties, List<String> partitions) { - List<Blocklet> blocklets = new ArrayList<>(); - - if (filterExp == null) { - for (int i = 0; i < readMinMaxDataMap.length; i++) { - blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId()))); - } - } else { - FilterExecuter filterExecuter = - FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - int startIndex = 0; - while (startIndex < readMinMaxDataMap.length) { - BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(), - readMinMaxDataMap[startIndex].getMinValues()); - if (!bitSet.isEmpty()) { - blocklets.add(new Blocklet(filePath, - String.valueOf(readMinMaxDataMap[startIndex].getBlockletId()))); - } - startIndex++; - } - } - return blocklets; - } - - @Override - public boolean isScanRequired(FilterResolverIntf filterExp) { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - readMinMaxDataMap = null; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java deleted file mode 100644 index 925731a..0000000 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.examples; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.carbondata.core.datamap.DataMapDistributable; -import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; -import org.apache.carbondata.core.datamap.dev.DataMapModel; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap; -import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMapFactory; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; -import org.apache.carbondata.core.scan.filter.intf.ExpressionType; -import org.apache.carbondata.events.Event; - -/** - * Min Max DataMap Factory - */ -public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory { - - private AbsoluteTableIdentifier identifier; - - @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) { - this.identifier = identifier; - } - - /** - * createWriter will return the MinMaxDataWriter. - * - * @param segmentId - * @return - */ - @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) { - return new MinMaxDataWriter(identifier, segmentId, dataWritePath); - } - - /** - * getDataMaps Factory method Initializes the Min Max Data Map and returns. - * - * @param segmentId - * @return - * @throws IOException - */ - @Override public List<AbstractCoarseGrainDataMap> getDataMaps(String segmentId) - throws IOException { - List<AbstractCoarseGrainDataMap> dataMapList = new ArrayList<>(); - // Form a dataMap of Type MinMaxDataMap. - MinMaxDataMap dataMap = new MinMaxDataMap(); - try { - dataMap.init(new DataMapModel( - identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator)); - } catch (MemoryException ex) { - - } - dataMapList.add(dataMap); - return dataMapList; - } - - /** - * @param segmentId - * @return - */ - @Override public List<DataMapDistributable> toDistributable(String segmentId) { - return null; - } - - /** - * Clear the DataMap. - * - * @param segmentId - */ - @Override public void clear(String segmentId) { - } - - /** - * Clearing the data map. - */ - @Override public void clear() { - } - - @Override public List<AbstractCoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) - throws IOException { - return null; - } - - @Override public void fireEvent(Event event) { - - } - - @Override public DataMapMeta getMeta() { - return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), - new ArrayList<ExpressionType>()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java new file mode 100644 index 0000000..216000b --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonUtil; + +import com.google.gson.Gson; + +/** + * Datamap implementation for min max blocklet. + */ +public class MinMaxIndexDataMap extends AbstractCoarseGrainIndexDataMap { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName()); + + public static final String NAME = "clustered.minmax.btree.blocklet"; + + private String filePath; + + private MinMaxIndexBlockDetails[] readMinMaxDataMap; + + @Override + public void init(DataMapModel model) throws MemoryException, IOException { + this.filePath = model.getFilePath(); + CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0"); + for (int i = 0; i < listFiles.length; i++) { + readMinMaxDataMap = readJson(listFiles[i].getPath()); + } + } + + private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) { + String path = filePath.substring(0, filePath.lastIndexOf("/") + 1); + CarbonFile carbonFile = FileFactory.getCarbonFile(path); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(".minmaxindex"); + } + }); + } + + private MinMaxIndexBlockDetails[] readJson(String filePath) { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + MinMaxIndexBlockDetails[] readMinMax = null; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath)); + + try { + if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, "UTF-8"); + buffReader = new BufferedReader(inStream); + readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class); + } catch (IOException e) { + return null; + } finally { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + return readMinMax; + } + + /** + * Block Prunning logic for Min Max DataMap. + * + * @param filterExp + * @param segmentProperties + * @return + */ + @Override + public List<Blocklet> prune(FilterResolverIntf filterExp, + SegmentProperties segmentProperties, List<String> partitions) { + List<Blocklet> blocklets = new ArrayList<>(); + + if (filterExp == null) { + for (int i = 0; i < readMinMaxDataMap.length; i++) { + blocklets.add(new Blocklet(filePath, String.valueOf(readMinMaxDataMap[i].getBlockletId()))); + } + } else { + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + int startIndex = 0; + while (startIndex < readMinMaxDataMap.length) { + BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(), + readMinMaxDataMap[startIndex].getMinValues()); + if (!bitSet.isEmpty()) { + blocklets.add(new Blocklet(filePath, + String.valueOf(readMinMaxDataMap[startIndex].getBlockletId()))); + } + startIndex++; + } + } + return blocklets; + } + + @Override + public boolean isScanRequired(FilterResolverIntf filterExp) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + readMinMaxDataMap = null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java new file mode 100644 index 0000000..5f714a1 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap; +import org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMapFactory; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.events.Event; + +/** + * Min Max DataMap Factory + */ +public class MinMaxIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory { + + private AbsoluteTableIdentifier identifier; + + @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) { + this.identifier = identifier; + } + + /** + * createWriter will return the MinMaxDataWriter. + * + * @param segmentId + * @return + */ + @Override public AbstractDataMapWriter createWriter(String segmentId, String dataWritePath) { + return new MinMaxDataWriter(identifier, segmentId, dataWritePath); + } + + /** + * getDataMaps Factory method Initializes the Min Max Data Map and returns. + * + * @param segmentId + * @return + * @throws IOException + */ + @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(String segmentId) + throws IOException { + List<AbstractCoarseGrainIndexDataMap> dataMapList = new ArrayList<>(); + // Form a dataMap of Type MinMaxIndexDataMap. + MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap(); + try { + dataMap.init(new DataMapModel( + identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator)); + } catch (MemoryException ex) { + + } + dataMapList.add(dataMap); + return dataMapList; + } + + /** + * @param segmentId + * @return + */ + @Override public List<DataMapDistributable> toDistributable(String segmentId) { + return null; + } + + /** + * Clear the DataMap. + * + * @param segmentId + */ + @Override public void clear(String segmentId) { + } + + /** + * Clearing the data map. + */ + @Override public void clear() { + } + + @Override public List<AbstractCoarseGrainIndexDataMap> getDataMaps(DataMapDistributable distributable) + throws IOException { + return null; + } + + @Override public void fireEvent(Event event) { + + } + + @Override public DataMapMeta getMeta() { + return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), + new ArrayList<ExpressionType>()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala index 0cfe410..59872aa 100644 --- a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala +++ b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala @@ -52,8 +52,8 @@ object MinMaxDataMapExample { // register datamap writer DataMapStoreManager.getInstance().createAndRegisterDataMap( AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"), - classOf[MinMaxDataMapFactory].getName, - MinMaxDataMap.NAME) + classOf[MinMaxIndexDataMapFactory].getName, + MinMaxIndexDataMap.NAME) spark.sql("DROP TABLE IF EXISTS carbonminmax") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/docs/datamap-developer-guide.md ---------------------------------------------------------------------- diff --git a/docs/datamap-developer-guide.md b/docs/datamap-developer-guide.md new file mode 100644 index 0000000..31afd34 --- /dev/null +++ b/docs/datamap-developer-guide.md @@ -0,0 +1,16 @@ +# DataMap Developer Guide + +### Introduction +DataMap is a data structure that can be used to accelerate certain query of the table. Different DataMap can be implemented by developers. +Currently, there are two 2 types of DataMap supported: +1. IndexDataMap: DataMap that leveraging index to accelerate filter query +2. MVDataMap: DataMap that leveraging Materialized View to accelerate olap style query, like SPJG query (select, predicate, join, groupby) + +### DataMap provider +When user issues `CREATE DATAMAP dm ON TABLE main USING 'provider'`, the corresponding DataMapProvider implementation will be created and initialized. +Currently, the provider string can be: +1. preaggregate: one type of MVDataMap that do pre-aggregate of single table +2. timeseries: one type of MVDataMap that do pre-aggregate based on time dimension of the table +3. class name IndexDataMapFactory implementation: Developer can implement new type of IndexDataMap by extending IndexDataMapFactory + +When user issues `DROP DATAMAP dm ON TABLE main`, the corresponding DataMapProvider interface will be called. \ No newline at end of file