Repository: incubator-carbondata Updated Branches: refs/heads/master 9ad98f432 -> 7213ac057
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 16bd771..279bb63 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -26,6 +26,7 @@ import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; import org.apache.carbondata.core.carbon.metadata.CarbonMetadata; import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex; import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex; @@ -52,22 +52,21 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSche import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage; +import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression; import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; +import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonMergerUtil; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.writer.CarbonFooterWriter; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; -import org.apache.carbondata.format.FileFooter; import org.apache.carbondata.format.IndexHeader; import org.apache.carbondata.processing.mdkeygen.file.FileData; -import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite; -import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; import org.apache.commons.lang3.ArrayUtils; @@ -87,10 +86,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< */ private static final int HDFS_CHECKSUM_LENGTH = 512; /** - * measure count - */ - protected int measureCount; - /** * file channel */ protected FileChannel fileChannel; @@ -98,16 +93,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * this will be used for holding blocklet metadata */ protected List<BlockletInfoColumnar> blockletInfoList; - /** - * keyBlockSize - */ - protected int[] keyBlockSize; protected boolean[] isNoDictionary; /** - * mdkeySize - */ - protected int mdkeySize; - /** * file name */ protected String fileName; @@ -115,15 +102,14 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * Local cardinality for the segment */ protected int[] localCardinality; - protected String databaseName; /** * thrift column schema */ protected List<org.apache.carbondata.format.ColumnSchema> thriftColumnSchemaList; - /** - * tabel name - */ - private String tableName; + protected NumberCompressor numberCompressor; + protected CarbonDataWriterVo dataWriterVo; + protected List<List<Long>> dataChunksOffsets; + protected List<List<Short>> dataChunksLength; /** * data file size; */ @@ -133,32 +119,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< */ private int fileCount; /** - * File manager - */ - private IFileManagerComposite fileManager; - /** - * Store Location - */ - private String storeLocation; - /** * executorService */ private ExecutorService executorService; - /** * executorService */ private List<Future<Void>> executorServiceSubmitList; - /** - * data file attributes which will used for file construction - */ - private CarbonDataFileAttributes carbonDataFileAttributes; private CarbonTablePath carbonTablePath; /** - * data directory location in carbon store path - */ - private String carbonDataDirectoryPath; - /** * data block size for one carbon data file */ private long dataBlockSize; @@ -171,68 +140,55 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< */ private int spaceReservedForBlockMetaSize; private FileOutputStream fileOutputStream; - - private SegmentProperties segmentProperties; - private List<BlockIndexInfo> blockIndexInfoList; - public AbstractFactDataWriter(String storeLocation, int measureCount, int mdKeyLength, - String databaseName, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize, - CarbonDataFileAttributes carbonDataFileAttributes, List<ColumnSchema> columnSchema, - String carbonDataDirectoryPath, int[] colCardinality, SegmentProperties segmentProperties, - int blocksize) { - - // measure count - this.measureCount = measureCount; - // table name - this.tableName = tableName; - this.databaseName = databaseName; - - this.databaseName = databaseName; - - this.storeLocation = storeLocation; - this.segmentProperties = segmentProperties; + public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) { + this.dataWriterVo = dataWriterVo; this.blockletInfoList = new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN); blockIndexInfoList = new ArrayList<>(); // get max file size; CarbonProperties propInstance = CarbonProperties.getInstance(); // if blocksize=2048, then 2048*1024*1024 will beyond the range of Int - this.fileSizeInBytes = (long) blocksize * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR - * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR; + this.fileSizeInBytes = + (long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR + * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR; this.spaceReservedForBlockMetaSize = Integer.parseInt(propInstance .getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE, CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT)); this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100; LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize); - this.fileManager = fileManager; - this.carbonDataDirectoryPath = carbonDataDirectoryPath; - this.keyBlockSize = keyBlockSize; - this.mdkeySize = mdKeyLength; + this.executorService = Executors.newFixedThreadPool(1); executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); // in case of compaction we will pass the cardinality. - this.localCardinality = colCardinality; - this.carbonDataFileAttributes = carbonDataFileAttributes; - CarbonTable carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); - carbonTablePath = - CarbonStorePath.getCarbonTablePath(storeLocation, carbonTable.getCarbonTableIdentifier()); + this.localCardinality = dataWriterVo.getColCardinality(); + CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable( + dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo + .getTableName()); + carbonTablePath = CarbonStorePath.getCarbonTablePath(dataWriterVo.getStoreLocation(), + carbonTable.getCarbonTableIdentifier()); //TODO: We should delete the levelmetadata file after reading here. // so only data loading flow will need to read from cardinality file. if (null == this.localCardinality) { - this.localCardinality = - CarbonMergerUtil.getCardinalityFromLevelMetadata(storeLocation, tableName); + this.localCardinality = CarbonMergerUtil + .getCardinalityFromLevelMetadata(dataWriterVo.getStoreLocation(), + dataWriterVo.getTableName()); List<Integer> cardinalityList = new ArrayList<Integer>(); - thriftColumnSchemaList = - getColumnSchemaListAndCardinality(cardinalityList, localCardinality, columnSchema); + thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality, + dataWriterVo.getWrapperColumnSchemaList()); localCardinality = ArrayUtils.toPrimitive(cardinalityList.toArray(new Integer[cardinalityList.size()])); } else { // for compaction case List<Integer> cardinalityList = new ArrayList<Integer>(); - thriftColumnSchemaList = - getColumnSchemaListAndCardinality(cardinalityList, localCardinality, columnSchema); + thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality, + dataWriterVo.getWrapperColumnSchemaList()); } + this.numberCompressor = new NumberCompressor(Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL))); + this.dataChunksOffsets = new ArrayList<>(); + this.dataChunksLength = new ArrayList<>(); } /** @@ -258,8 +214,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< String readableFileSize = ByteUtil.convertByteToReadable(fileSize); String readableMaxSize = ByteUtil.convertByteToReadable(maxSize); LOGGER.info("The configured block size is " + readableBlockSize + - ", the actual carbon file size is " + readableFileSize + - ", choose the max value " + readableMaxSize + " as the block size on HDFS"); + ", the actual carbon file size is " + readableFileSize + + ", choose the max value " + readableMaxSize + " as the block size on HDFS"); return maxSize; } @@ -291,6 +247,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< this.currentFileSize = 0; blockletInfoList = new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + this.dataChunksOffsets = new ArrayList<>(); + this.dataChunksLength = new ArrayList<>(); CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); // rename carbon data file from in progress status to actual renameCarbonDataFile(); @@ -312,12 +270,12 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< // increment the file sequence counter initFileCount(); String carbonDataFileName = carbonTablePath - .getCarbonDataFileName(fileCount, carbonDataFileAttributes.getTaskId(), - carbonDataFileAttributes.getFactTimeStamp()); + .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(), + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; - FileData fileData = new FileData(actualFileNameVal, this.storeLocation); - fileManager.add(fileData); - this.fileName = storeLocation + File.separator + carbonDataFileName + FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation()); + dataWriterVo.getFileManager().add(fileData); + this.fileName = dataWriterVo.getStoreLocation() + File.separator + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; this.fileCount++; try { @@ -332,11 +290,10 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< private int initFileCount() { int fileInitialCount = 0; - File[] dataFiles = new File(storeLocation).listFiles(new FileFilter() { - + File[] dataFiles = new File(dataWriterVo.getStoreLocation()).listFiles(new FileFilter() { @Override public boolean accept(File pathVal) { - if (!pathVal.isDirectory() && pathVal.getName().startsWith(tableName) && pathVal.getName() - .contains(CarbonCommonConstants.FACT_FILE_EXT)) { + if (!pathVal.isDirectory() && pathVal.getName().startsWith(dataWriterVo.getTableName()) + && pathVal.getName().contains(CarbonCommonConstants.FACT_FILE_EXT)) { return true; } return false; @@ -359,20 +316,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< /** * This method will write metadata at the end of file file format in thrift format */ - protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel, - String filePath) throws CarbonDataWriterException { - try { - long currentPosition = channel.size(); - CarbonFooterWriter writer = new CarbonFooterWriter(filePath); - FileFooter convertFileMeta = CarbonMetadataUtil - .convertFileFooter(infoList, localCardinality.length, localCardinality, - thriftColumnSchemaList, segmentProperties); - fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition); - writer.writeFooter(convertFileMeta, currentPosition); - } catch (IOException e) { - throw new CarbonDataWriterException("Problem while writing the carbon file: ", e); - } - } + protected abstract void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, + FileChannel channel, String filePath) throws CarbonDataWriterException; /** * Below method will be used to fill the vlock info details @@ -382,7 +327,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * @param filePath file path * @param currentPosition current offset */ - private void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows, + protected void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows, String filePath, long currentPosition) { // as min-max will change for each blocklet and second blocklet min-max can be lesser than @@ -448,56 +393,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * * @return BlockletInfo - blocklet metadata */ - protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) { - // create the info object for leaf entry - BlockletInfoColumnar infoObj = new BlockletInfoColumnar(); - // add total entry count - infoObj.setNumberOfKeys(nodeHolder.getEntryCount()); - - // add the key array length - infoObj.setKeyLengths(nodeHolder.getKeyLengths()); - //add column min max data - infoObj.setColumnMaxData(nodeHolder.getColumnMaxData()); - infoObj.setColumnMinData(nodeHolder.getColumnMinData()); - infoObj.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex()); - long[] keyOffSets = new long[nodeHolder.getKeyLengths().length]; - - for (int i = 0; i < keyOffSets.length; i++) { - keyOffSets[i] = offset; - offset += nodeHolder.getKeyLengths()[i]; - } - // add key offset - infoObj.setKeyOffSets(keyOffSets); - - // add measure length - infoObj.setMeasureLength(nodeHolder.getMeasureLenght()); - - long[] msrOffset = new long[this.measureCount]; - - for (int i = 0; i < this.measureCount; i++) { - msrOffset[i] = offset; - // now increment the offset by adding measure length to get the next - // measure offset; - offset += nodeHolder.getMeasureLenght()[i]; - } - // add measure offset - infoObj.setMeasureOffset(msrOffset); - infoObj.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock()); - infoObj.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength()); - long[] keyBlockIndexOffsets = new long[nodeHolder.getKeyBlockIndexLength().length]; - for (int i = 0; i < keyBlockIndexOffsets.length; i++) { - keyBlockIndexOffsets[i] = offset; - offset += nodeHolder.getKeyBlockIndexLength()[i]; - } - infoObj.setKeyBlockIndexOffSets(keyBlockIndexOffsets); - // set startkey - infoObj.setStartKey(nodeHolder.getStartKey()); - // set end key - infoObj.setEndKey(nodeHolder.getEndKey()); - infoObj.setCompressionModel(nodeHolder.getCompressionModel()); - // return leaf metadata - return infoObj; - } + protected abstract BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset); /** * Method will be used to close the open file channel @@ -528,9 +424,9 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList); // get the block index info thrift List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList); - String fileName = storeLocation + File.separator + carbonTablePath - .getCarbonIndexFileName(carbonDataFileAttributes.getTaskId(), - carbonDataFileAttributes.getFactTimeStamp()); + String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath + .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(), + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); CarbonIndexFileWriter writer = new CarbonIndexFileWriter(); // open file writer.openThriftWriter(fileName); @@ -591,11 +487,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< private void copyCarbonDataFileToCarbonStorePath(String localFileName) throws CarbonDataWriterException { long copyStartTime = System.currentTimeMillis(); - LOGGER.info("Copying " + localFileName + " --> " + carbonDataDirectoryPath); + LOGGER.info("Copying " + localFileName + " --> " + dataWriterVo.getCarbonDataDirectoryPath()); try { CarbonFile localCarbonFile = FileFactory.getCarbonFile(localFileName, FileFactory.getFileType(localFileName)); - String carbonFilePath = carbonDataDirectoryPath + localFileName + String carbonFilePath = dataWriterVo.getCarbonDataDirectoryPath() + localFileName .substring(localFileName.lastIndexOf(File.separator)); copyLocalFileToCarbonStore(carbonFilePath, localFileName, CarbonCommonConstants.BYTEBUFFER_SIZE, @@ -654,18 +550,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * @throws CarbonDataWriterException * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem */ - protected void writeDataToFile(NodeHolder nodeHolder) throws CarbonDataWriterException { - // write data to file and get its offset - long offset = writeDataToFile(nodeHolder, fileChannel); - // get the blocklet info for currently added blocklet - BlockletInfoColumnar blockletInfo = getBlockletInfo(nodeHolder, offset); - // add blocklet info to list - blockletInfoList.add(blockletInfo); - // calculate the current size of the file - } - - protected abstract long writeDataToFile(NodeHolder nodeHolder, FileChannel channel) - throws CarbonDataWriterException; + public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException; @Override public int getLeafMetadataSize() { return blockletInfoList.size(); @@ -675,6 +560,99 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< return this.fileName; } + protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray, + int entryCount) { + byte[][] keyBlockData = new byte[keyStorageArray.length][]; + int destPos = 0; + int keyBlockSizePosition = -1; + for (int i = 0; i < keyStorageArray.length; i++) { + destPos = 0; + //handling for high card dims + if (!dataWriterVo.getIsComplexType()[i] && !dataWriterVo.getIsDictionaryColumn()[i]) { + int totalLength = 0; + // calc size of the total bytes in all the colmns. + for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) { + byte[] colValue = keyStorageArray[i].getKeyBlock()[k]; + totalLength += colValue.length; + } + keyBlockData[i] = new byte[totalLength]; + + for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) { + int length = keyStorageArray[i].getKeyBlock()[j].length; + System + .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, length); + destPos += length; + } + } else { + keyBlockSizePosition++; + if (dataWriterVo.getAggBlocks()[i]) { + keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()]; + for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) { + System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, + keyStorageArray[i].getKeyBlock()[j].length); + destPos += keyStorageArray[i].getKeyBlock()[j].length; + } + } else { + if (dataWriterVo.getIsComplexType()[i]) { + keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length * dataWriterVo + .getKeyBlockSize()[keyBlockSizePosition]]; + } else { + keyBlockData[i] = + new byte[entryCount * dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]]; + } + for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) { + System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, + dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]); + destPos += dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]; + } + } + } + keyBlockData[i] = SnappyByteCompression.INSTANCE.compress(keyBlockData[i]); + } + return keyBlockData; + } + + /** + * Below method will be used to update the min or max value + * by removing the length from it + * + * @return min max value without length + */ + protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) { + ByteBuffer buffer = ByteBuffer.wrap(valueWithLength); + byte[] actualValue = new byte[buffer.getShort()]; + buffer.get(actualValue); + return actualValue; + } + + /** + * Below method will be used to update the no dictionary start and end key + * + * @param key key to be updated + * @return return no dictionary key + */ + protected byte[] updateNoDictionaryStartAndEndKey(byte[] key) { + if (key.length == 0) { + return key; + } + // add key to byte buffer remove the length part of the data + ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2); + // create a output buffer without length + ByteBuffer output = ByteBuffer.allocate(key.length - 2); + short numberOfByteToStorLength = 2; + // as length part is removed, so each no dictionary value index + // needs to be reshuffled by 2 bytes + for (int i = 0; i < dataWriterVo.getNoDictionaryCount(); i++) { + output.putShort((short) (buffer.getShort() - numberOfByteToStorLength)); + } + // copy the data part + while (buffer.hasRemaining()) { + output.put(buffer.get()); + } + output.rewind(); + return output.array(); + } + /** * This method will copy the carbon data file from local store location to * carbon store location http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java new file mode 100644 index 0000000..6e0287d --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.store.writer; + +import java.util.List; + +import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite; +import org.apache.carbondata.processing.store.CarbonDataFileAttributes; + +/** + * Value object for writing the data + */ +public class CarbonDataWriterVo { + + private String storeLocation; + + private int measureCount; + + private int mdKeyLength; + + private String tableName; + + private IFileManagerComposite fileManager; + + private int[] keyBlockSize; + + private boolean[] aggBlocks; + + private boolean[] isComplexType; + + private int NoDictionaryCount; + + private CarbonDataFileAttributes carbonDataFileAttributes; + + private String databaseName; + + private List<ColumnSchema> wrapperColumnSchemaList; + + private int numberOfNoDictionaryColumn; + + private boolean[] isDictionaryColumn; + + private String carbonDataDirectoryPath; + + private int[] colCardinality; + + private SegmentProperties segmentProperties; + + private int tableBlocksize; + + /** + * @return the storeLocation + */ + public String getStoreLocation() { + return storeLocation; + } + + /** + * @param storeLocation the storeLocation to set + */ + public void setStoreLocation(String storeLocation) { + this.storeLocation = storeLocation; + } + + /** + * @return the measureCount + */ + public int getMeasureCount() { + return measureCount; + } + + /** + * @param measureCount the measureCount to set + */ + public void setMeasureCount(int measureCount) { + this.measureCount = measureCount; + } + + /** + * @return the mdKeyLength + */ + public int getMdKeyLength() { + return mdKeyLength; + } + + /** + * @param mdKeyLength the mdKeyLength to set + */ + public void setMdKeyLength(int mdKeyLength) { + this.mdKeyLength = mdKeyLength; + } + + /** + * @return the tableName + */ + public String getTableName() { + return tableName; + } + + /** + * @param tableName the tableName to set + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * @return the fileManager + */ + public IFileManagerComposite getFileManager() { + return fileManager; + } + + /** + * @param fileManager the fileManager to set + */ + public void setFileManager(IFileManagerComposite fileManager) { + this.fileManager = fileManager; + } + + /** + * @return the keyBlockSize + */ + public int[] getKeyBlockSize() { + return keyBlockSize; + } + + /** + * @param keyBlockSize the keyBlockSize to set + */ + public void setKeyBlockSize(int[] keyBlockSize) { + this.keyBlockSize = keyBlockSize; + } + + /** + * @return the aggBlocks + */ + public boolean[] getAggBlocks() { + return aggBlocks; + } + + /** + * @param aggBlocks the aggBlocks to set + */ + public void setAggBlocks(boolean[] aggBlocks) { + this.aggBlocks = aggBlocks; + } + + /** + * @return the isComplexType + */ + public boolean[] getIsComplexType() { + return isComplexType; + } + + /** + * @param isComplexType the isComplexType to set + */ + public void setIsComplexType(boolean[] isComplexType) { + this.isComplexType = isComplexType; + } + + /** + * @return the noDictionaryCount + */ + public int getNoDictionaryCount() { + return NoDictionaryCount; + } + + /** + * @param noDictionaryCount the noDictionaryCount to set + */ + public void setNoDictionaryCount(int noDictionaryCount) { + NoDictionaryCount = noDictionaryCount; + } + + /** + * @return the carbonDataFileAttributes + */ + public CarbonDataFileAttributes getCarbonDataFileAttributes() { + return carbonDataFileAttributes; + } + + /** + * @param carbonDataFileAttributes the carbonDataFileAttributes to set + */ + public void setCarbonDataFileAttributes(CarbonDataFileAttributes carbonDataFileAttributes) { + this.carbonDataFileAttributes = carbonDataFileAttributes; + } + + /** + * @return the databaseName + */ + public String getDatabaseName() { + return databaseName; + } + + /** + * @param databaseName the databaseName to set + */ + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + /** + * @return the wrapperColumnSchemaList + */ + public List<ColumnSchema> getWrapperColumnSchemaList() { + return wrapperColumnSchemaList; + } + + /** + * @param wrapperColumnSchemaList the wrapperColumnSchemaList to set + */ + public void setWrapperColumnSchemaList(List<ColumnSchema> wrapperColumnSchemaList) { + this.wrapperColumnSchemaList = wrapperColumnSchemaList; + } + + /** + * @return the numberOfNoDictionaryColumn + */ + public int getNumberOfNoDictionaryColumn() { + return numberOfNoDictionaryColumn; + } + + /** + * @param numberOfNoDictionaryColumn the numberOfNoDictionaryColumn to set + */ + public void setNumberOfNoDictionaryColumn(int numberOfNoDictionaryColumn) { + this.numberOfNoDictionaryColumn = numberOfNoDictionaryColumn; + } + + /** + * @return the isDictionaryColumn + */ + public boolean[] getIsDictionaryColumn() { + return isDictionaryColumn; + } + + /** + * @param isDictionaryColumn the isDictionaryColumn to set + */ + public void setIsDictionaryColumn(boolean[] isDictionaryColumn) { + this.isDictionaryColumn = isDictionaryColumn; + } + + /** + * @return the carbonDataDirectoryPath + */ + public String getCarbonDataDirectoryPath() { + return carbonDataDirectoryPath; + } + + /** + * @param carbonDataDirectoryPath the carbonDataDirectoryPath to set + */ + public void setCarbonDataDirectoryPath(String carbonDataDirectoryPath) { + this.carbonDataDirectoryPath = carbonDataDirectoryPath; + } + + /** + * @return the colCardinality + */ + public int[] getColCardinality() { + return colCardinality; + } + + /** + * @param colCardinality the colCardinality to set + */ + public void setColCardinality(int[] colCardinality) { + this.colCardinality = colCardinality; + } + + /** + * @return the segmentProperties + */ + public SegmentProperties getSegmentProperties() { + return segmentProperties; + } + + /** + * @param segmentProperties the segmentProperties to set + */ + public void setSegmentProperties(SegmentProperties segmentProperties) { + this.segmentProperties = segmentProperties; + } + + /** + * @return the tableBlocksize + */ + public int getTableBlocksize() { + return tableBlocksize; + } + + /** + * @param tableBlocksize the tableBlocksize to set + */ + public void setTableBlocksize(int tableBlocksize) { + this.tableBlocksize = tableBlocksize; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java new file mode 100644 index 0000000..d399280 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.store.writer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.BlockletInfoColumnar; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.writer.CarbonFooterWriter; +import org.apache.carbondata.format.DataChunk2; +import org.apache.carbondata.format.FileFooter; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; + +/** + * Below method will be used to write the data in version 2 format + */ +public class CarbonFactDataWriterImpl2 extends CarbonFactDataWriterImplForIntIndexAndAggBlock { + + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonFactDataWriterImpl2.class.getName()); + + /** + * Constructor create instance of this class + * + * @param dataWriterVo + */ + public CarbonFactDataWriterImpl2(CarbonDataWriterVo dataWriterVo) { + super(dataWriterVo); + } + + /** + * Below method will be used to write the data to carbon data file + * + * @param holder + * @throws CarbonDataWriterException any problem in writing operation + */ + @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException { + // size to calculate the size of the blocklet + int size = 0; + // get the blocklet info object + BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0); + + List<DataChunk2> datachunks = null; + try { + // get all the data chunks + datachunks = CarbonMetadataUtil + .getDatachunk2(blockletInfo, thriftColumnSchemaList, dataWriterVo.getSegmentProperties()); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while getting the data chunks", e); + } + // data chunk byte array + byte[][] dataChunkByteArray = new byte[datachunks.size()][]; + for (int i = 0; i < dataChunkByteArray.length; i++) { + dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i)); + // add the data chunk size + size += dataChunkByteArray[i].length; + } + // add row id index length + for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) { + size += holder.getKeyBlockIndexLength()[i]; + } + // add rle index length + for (int i = 0; i < holder.getDataIndexMapLength().length; i++) { + size += holder.getDataIndexMapLength()[i]; + } + // add dimension column data page and measure column data page size + long blockletDataSize = + holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size; + // if size of the file already reached threshold size then create a new file and get the file + // channel object + updateBlockletFileChannel(blockletDataSize); + // writer the version header in the file if current file size is zero + // this is done so carbondata file can be read separately + try { + if (fileChannel.size() == 0) { + short version = Short.parseShort(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION)); + byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes(); + ByteBuffer buffer = ByteBuffer.allocate(header.length); + buffer.put(header); + buffer.rewind(); + fileChannel.write(buffer); + } + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while getting the file channel size", e); + } + // write data to file and get its offset + writeDataToFile(holder, dataChunkByteArray, fileChannel); + // add blocklet info to list + blockletInfoList.add(blockletInfo); + LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte"); + } + + /** + * Below method will be used to write the data to file + * Data Format + * <DColumn1DataChunk><DColumnDataPage><DColumnRle> + * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle> + * <DColumn3DataChunk><DColumn3DataPage><column3RowIds> + * <MColumn1DataChunk><MColumn1DataPage> + * <MColumn2DataChunk><MColumn2DataPage> + * <MColumn2DataChunk><MColumn2DataPage> + * + * @param nodeHolder + * @param dataChunksBytes + * @param channel + * @throws CarbonDataWriterException + */ + private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel) + throws CarbonDataWriterException { + long offset = 0; + try { + offset = channel.size(); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while getting the file channel size"); + } + List<Long> currentDataChunksOffset = new ArrayList<>(); + List<Short> currentDataChunksLength = new ArrayList<>(); + dataChunksLength.add(currentDataChunksLength); + dataChunksOffsets.add(currentDataChunksOffset); + int bufferSize = 0; + int rowIdIndex = 0; + int rleIndex = 0; + for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) { + currentDataChunksOffset.add(offset); + currentDataChunksLength.add((short) dataChunksBytes[i].length); + bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder + .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + ( + dataWriterVo.getAggBlocks()[i] ? + nodeHolder.getCompressedDataIndex()[rleIndex].length : + 0); + offset += dataChunksBytes[i].length; + offset += nodeHolder.getKeyLengths()[i]; + if (!nodeHolder.getIsSortedKeyBlock()[i]) { + offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex]; + rowIdIndex++; + } + if (dataWriterVo.getAggBlocks()[i]) { + offset += nodeHolder.getDataIndexMapLength()[rleIndex]; + rleIndex++; + } + } + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + rleIndex = 0; + rowIdIndex = 0; + for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) { + buffer.put(dataChunksBytes[i]); + buffer.put(nodeHolder.getKeyArray()[i]); + if (!nodeHolder.getIsSortedKeyBlock()[i]) { + buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length); + buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]); + if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) { + buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]); + } + rowIdIndex++; + } + if (dataWriterVo.getAggBlocks()[i]) { + buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]); + rleIndex++; + } + } + try { + buffer.flip(); + channel.write(buffer); + } catch (IOException e) { + throw new CarbonDataWriterException( + "Problem while writing the dimension data in carbon data file", e); + } + + int dataChunkIndex = nodeHolder.getKeyArray().length; + int totalLength = 0; + for (int i = 0; i < nodeHolder.getDataArray().length; i++) { + currentDataChunksOffset.add(offset); + currentDataChunksLength.add((short) dataChunksBytes[dataChunkIndex].length); + offset += dataChunksBytes[dataChunkIndex].length; + offset += nodeHolder.getDataArray()[i].length; + totalLength += dataChunksBytes[dataChunkIndex].length; + totalLength += nodeHolder.getDataArray()[i].length; + dataChunkIndex++; + } + buffer = ByteBuffer.allocate(totalLength); + dataChunkIndex = nodeHolder.getKeyArray().length; + for (int i = 0; i < nodeHolder.getDataArray().length; i++) { + buffer.put(dataChunksBytes[dataChunkIndex++]); + buffer.put(nodeHolder.getDataArray()[i]); + } + try { + buffer.flip(); + channel.write(buffer); + } catch (IOException e) { + throw new CarbonDataWriterException( + "Problem while writing the measure data in carbon data file", e); + } + } + + /** + * This method will be used to get the blocklet metadata + * + * @return BlockletInfo - blocklet metadata + */ + protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) { + // create the info object for leaf entry + BlockletInfoColumnar info = new BlockletInfoColumnar(); + //add aggBlocks array + info.setAggKeyBlock(nodeHolder.getAggBlocks()); + // add total entry count + info.setNumberOfKeys(nodeHolder.getEntryCount()); + + // add the key array length + info.setKeyLengths(nodeHolder.getKeyLengths()); + // adding null measure index bit set + info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex()); + //add column min max length + info.setColumnMaxData(nodeHolder.getColumnMaxData()); + info.setColumnMinData(nodeHolder.getColumnMinData()); + + // add measure length + info.setMeasureLength(nodeHolder.getMeasureLenght()); + + info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock()); + info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength()); + info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength()); + // set startkey + info.setStartKey(nodeHolder.getStartKey()); + // set end key + info.setEndKey(nodeHolder.getEndKey()); + info.setCompressionModel(nodeHolder.getCompressionModel()); + // return leaf metadata + + //colGroup Blocks + info.setColGrpBlocks(nodeHolder.getColGrpBlocks()); + + return info; + } + + /** + * This method will write metadata at the end of file file format in thrift format + */ + protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel, + String filePath) throws CarbonDataWriterException { + try { + // get the current file position + long currentPosition = channel.size(); + CarbonFooterWriter writer = new CarbonFooterWriter(filePath); + // get thrift file footer instance + FileFooter convertFileMeta = CarbonMetadataUtil + .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList, + dataChunksOffsets, dataChunksLength); + // fill the carbon index details + fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition); + // write the footer + writer.writeFooter(convertFileMeta, currentPosition); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while writing the carbon file: ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java index 259482e..8c2608b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java @@ -26,48 +26,23 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage; -import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression; import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel; -import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite; -import org.apache.carbondata.processing.store.CarbonDataFileAttributes; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.writer.CarbonFooterWriter; +import org.apache.carbondata.format.FileFooter; import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage; import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFactDataWriter<int[]> { - protected boolean[] aggBlocks; - private NumberCompressor numberCompressor; - private boolean[] isComplexType; - private int numberOfNoDictionaryColumn; - private boolean[] isDictionaryColumn; - private static final LogService LOGGER = LogServiceFactory.getLogService( - CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName()); + private static final LogService LOGGER = LogServiceFactory + .getLogService(CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName()); - public CarbonFactDataWriterImplForIntIndexAndAggBlock(String storeLocation, int measureCount, - int mdKeyLength, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize, - boolean[] aggBlocks, boolean[] isComplexType, int NoDictionaryCount, - CarbonDataFileAttributes carbonDataFileAttributes, String databaseName, - List<ColumnSchema> wrapperColumnSchemaList, int numberOfNoDictionaryColumn, - boolean[] isDictionaryColumn, String carbonDataDirectoryPath, int[] colCardinality, - SegmentProperties segmentProperties, int tableBlocksize) { - super(storeLocation, measureCount, mdKeyLength, databaseName, tableName, fileManager, - keyBlockSize, carbonDataFileAttributes, wrapperColumnSchemaList, carbonDataDirectoryPath, - colCardinality, segmentProperties, tableBlocksize); - this.isComplexType = isComplexType; - this.databaseName = databaseName; - this.numberOfNoDictionaryColumn = numberOfNoDictionaryColumn; - this.isDictionaryColumn = isDictionaryColumn; - this.aggBlocks = aggBlocks; - this.numberCompressor = new NumberCompressor(Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, - CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL))); + public CarbonFactDataWriterImplForIntIndexAndAggBlock(CarbonDataWriterVo dataWriterVo) { + super(dataWriterVo); } @Override @@ -110,7 +85,7 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact } totalKeySize += keyLengths[i]; - if (isComplexType[i] || isDictionaryColumn[i]) { + if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) { allMinValue[i] = keyStorageArray[i].getMin(); allMaxValue[i] = keyStorageArray[i].getMax(); } else { @@ -142,16 +117,16 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact } } int compressDataBlockSize = 0; - for (int i = 0; i < aggBlocks.length; i++) { - if (aggBlocks[i]) { + for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) { + if (dataWriterVo.getAggBlocks()[i]) { compressDataBlockSize++; } } byte[][] compressedDataIndex = new byte[compressDataBlockSize][]; int[] dataIndexMapLength = new int[compressDataBlockSize]; idx = 0; - for (int i = 0; i < aggBlocks.length; i++) { - if (aggBlocks[i]) { + for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) { + if (dataWriterVo.getAggBlocks()[i]) { try { compressedDataIndex[idx] = numberCompressor.compress(keyStorageArray[i].getDataIndexMap()); @@ -163,13 +138,7 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact } } - byte[] writableKeyArray = new byte[totalKeySize]; - int startPosition = 0; - for (int i = 0; i < keyLengths.length; i++) { - System.arraycopy(keyBlockData[i], 0, writableKeyArray, startPosition, keyBlockData[i].length); - startPosition += keyLengths[i]; - } - int[] msrLength = new int[this.measureCount]; + int[] msrLength = new int[dataWriterVo.getMeasureCount()]; // calculate the total size required for all the measure and get the // each measure size for (int i = 0; i < dataArray.length; i++) { @@ -177,30 +146,9 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact totalMsrArrySize += currentMsrLenght; msrLength[i] = currentMsrLenght; } - byte[] writableDataArray = new byte[totalMsrArrySize]; - - // start position will be used for adding the measure in - // writableDataArray after adding measure increment the start position - // by added measure length which will be used for next measure start - // position - startPosition = 0; - for (int i = 0; i < dataArray.length; i++) { - System.arraycopy(dataArray[i], 0, writableDataArray, startPosition, dataArray[i].length); - startPosition += msrLength[i]; - } - // current file size; - int indexBlockSize = 0; - for (int i = 0; i < keyBlockIdxLengths.length; i++) { - indexBlockSize += keyBlockIdxLengths[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE; - } - - for (int i = 0; i < dataIndexMapLength.length; i++) { - indexBlockSize += dataIndexMapLength[i]; - } - NodeHolder holder = new NodeHolder(); - holder.setDataArray(writableDataArray); - holder.setKeyArray(writableKeyArray); + holder.setDataArray(dataArray); + holder.setKeyArray(keyBlockData); // end key format will be <length of dictionary key><length of no // dictionary key><DictionaryKey><No Dictionary key> byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey); @@ -235,10 +183,12 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact holder.setDataIndexMapLength(dataIndexMapLength); holder.setCompressedDataIndex(compressedDataIndex); holder.setCompressionModel(compressionModel); + holder.setTotalDimensionArrayLength(totalKeySize); + holder.setTotalMeasureArrayLength(totalMsrArrySize); //setting column min max value holder.setColumnMaxData(allMaxValue); holder.setColumnMinData(allMinValue); - holder.setAggBlocks(aggBlocks); + holder.setAggBlocks(dataWriterVo.getAggBlocks()); holder.setColGrpBlocks(colGrpBlock); return holder; } @@ -252,113 +202,28 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact for (int i = 0; i < holder.getDataIndexMapLength().length; i++) { indexBlockSize += holder.getDataIndexMapLength()[i]; } + long blockletDataSize = - holder.getKeyArray().length + holder.getDataArray().length + indexBlockSize; + holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + + indexBlockSize; updateBlockletFileChannel(blockletDataSize); - writeDataToFile(holder); + // write data to file and get its offset + long offset = writeDataToFile(holder, fileChannel); + // get the blocklet info for currently added blocklet + BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset); + // add blocklet info to list + blockletInfoList.add(blockletInfo); LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte"); } /** - * Below method will be used to update the min or max value - * by removing the length from it - * - * @return min max value without length - */ - private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) { - ByteBuffer buffer = ByteBuffer.wrap(valueWithLength); - byte[] actualValue = new byte[buffer.getShort()]; - buffer.get(actualValue); - return actualValue; - } - - /** - * Below method will be used to update the no dictionary start and end key - * - * @param key key to be updated - * @return return no dictionary key - */ - private byte[] updateNoDictionaryStartAndEndKey(byte[] key) { - if (key.length == 0) { - return key; - } - // add key to byte buffer remove the length part of the data - ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2); - // create a output buffer without length - ByteBuffer output = ByteBuffer.allocate(key.length - 2); - short numberOfByteToStorLength = 2; - // as length part is removed, so each no dictionary value index - // needs to be reshuffled by 2 bytes - for (int i = 0; i < numberOfNoDictionaryColumn; i++) { - output.putShort((short) (buffer.getShort() - numberOfByteToStorLength)); - } - // copy the data part - while (buffer.hasRemaining()) { - output.put(buffer.get()); - } - output.rewind(); - return output.array(); - } - - protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray, - int entryCount) { - byte[][] keyBlockData = new byte[keyStorageArray.length][]; - int destPos = 0; - int keyBlockSizePosition = -1; - for (int i = 0; i < keyStorageArray.length; i++) { - destPos = 0; - //handling for high card dims - if (!isComplexType[i] && !this.isDictionaryColumn[i]) { - int totalLength = 0; - // calc size of the total bytes in all the colmns. - for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) { - byte[] colValue = keyStorageArray[i].getKeyBlock()[k]; - totalLength += colValue.length; - } - keyBlockData[i] = new byte[totalLength]; - - for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) { - int length = keyStorageArray[i].getKeyBlock()[j].length; - System - .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, length); - destPos += length; - } - } else { - keyBlockSizePosition++; - if (aggBlocks[i]) { - keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()]; - for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) { - System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, - keyStorageArray[i].getKeyBlock()[j].length); - destPos += keyStorageArray[i].getKeyBlock()[j].length; - } - } else { - if (isComplexType[i]) { - keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length - * keyBlockSize[keyBlockSizePosition]]; - } else { - keyBlockData[i] = new byte[entryCount * keyBlockSize[keyBlockSizePosition]]; - } - for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) { - System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, - keyBlockSize[keyBlockSizePosition]); - destPos += keyBlockSize[keyBlockSizePosition]; - } - } - } - keyBlockData[i] = SnappyByteCompression.INSTANCE.compress(keyBlockData[i]); - } - return keyBlockData; - } - - /** * This method is responsible for writing blocklet to the data file * * @return file offset offset is the current position of the file * @throws CarbonDataWriterException if will throw CarbonDataWriterException when any thing * goes wrong while while writing the leaf file */ - protected long writeDataToFile(NodeHolder nodeHolder, FileChannel channel) + private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel) throws CarbonDataWriterException { // create byte buffer byte[][] compressedIndex = nodeHolder.getCompressedIndex(); @@ -375,15 +240,20 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact indexBlockSize += nodeHolder.getDataIndexMapLength()[i]; } ByteBuffer byteBuffer = ByteBuffer.allocate( - nodeHolder.getKeyArray().length + nodeHolder.getDataArray().length + indexBlockSize); + nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + + indexBlockSize); long offset = 0; try { // get the current offset offset = channel.size(); // add key array to byte buffer - byteBuffer.put(nodeHolder.getKeyArray()); + for (int i = 0; i < nodeHolder.getKeyArray().length; i++) { + byteBuffer.put(nodeHolder.getKeyArray()[i]); + } + for (int i = 0; i < nodeHolder.getDataArray().length; i++) { + byteBuffer.put(nodeHolder.getDataArray()[i]); + } // add measure data array to byte buffer - byteBuffer.put(nodeHolder.getDataArray()); ByteBuffer buffer1 = null; for (int i = 0; i < compressedIndex.length; i++) { @@ -448,9 +318,9 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact // add measure length info.setMeasureLength(nodeHolder.getMeasureLenght()); - long[] msrOffset = new long[this.measureCount]; + long[] msrOffset = new long[dataWriterVo.getMeasureCount()]; - for (int i = 0; i < this.measureCount; i++) { + for (int i = 0; i < msrOffset.length; i++) { // increment the current offset by 4 bytes because 4 bytes will be // used for measure byte length // offset += CarbonCommonConstants.INT_SIZE_IN_BYTE; @@ -489,4 +359,21 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact return info; } + /** + * This method will write metadata at the end of file file format in thrift format + */ + protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel, + String filePath) throws CarbonDataWriterException { + try { + long currentPosition = channel.size(); + CarbonFooterWriter writer = new CarbonFooterWriter(filePath); + FileFooter convertFileMeta = CarbonMetadataUtil + .convertFileFooter(infoList, localCardinality.length, localCardinality, + thriftColumnSchemaList, dataWriterVo.getSegmentProperties()); + fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition); + writer.writeFooter(convertFileMeta, currentPosition); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while writing the carbon file: ", e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java index aa758e6..a7d14f0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java @@ -27,12 +27,12 @@ public class NodeHolder { /** * keyArray */ - private byte[] keyArray; + private byte[][] keyArray; /** * dataArray */ - private byte[] dataArray; + private byte[][] dataArray; /** * measureLenght @@ -139,30 +139,40 @@ public class NodeHolder { private BitSet[] measureNullValueIndex; /** + * total length of dimension values + */ + private int totalDimensionArrayLength; + + /** + * total length of all measure values + */ + private int totalMeasureArrayLength; + + /** * @return the keyArray */ - public byte[] getKeyArray() { + public byte[][] getKeyArray() { return keyArray; } /** * @param keyArray the keyArray to set */ - public void setKeyArray(byte[] keyArray) { + public void setKeyArray(byte[][] keyArray) { this.keyArray = keyArray; } /** * @return the dataArray */ - public byte[] getDataArray() { + public byte[][] getDataArray() { return dataArray; } /** * @param dataArray the dataArray to set */ - public void setDataArray(byte[] dataArray) { + public void setDataArray(byte[][] dataArray) { this.dataArray = dataArray; } @@ -453,4 +463,20 @@ public class NodeHolder { public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) { this.measureNullValueIndex = measureNullValueIndex; } + + public int getTotalDimensionArrayLength() { + return totalDimensionArrayLength; + } + + public void setTotalDimensionArrayLength(int totalDimensionArrayLength) { + this.totalDimensionArrayLength = totalDimensionArrayLength; + } + + public int getTotalMeasureArrayLength() { + return totalMeasureArrayLength; + } + + public void setTotalMeasureArrayLength(int totalMeasureArrayLength) { + this.totalMeasureArrayLength = totalMeasureArrayLength; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index 62b442f..84192b8 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -35,9 +35,12 @@ import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.test.util.StoreCreator; import junit.framework.TestCase; + +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -45,10 +48,22 @@ public class BlockIndexStoreTest extends TestCase { private BlockIndexStore indexStore; + private String property; @BeforeClass public void setUp() { + property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION); + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1"); StoreCreator.createCarbonStore(); indexStore = BlockIndexStore.getInstance(); } + + @AfterClass public void tearDown() { + if(null!=property) { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, property); + }else { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION+""); + } + } @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException { String canonicalPath = @@ -56,7 +71,7 @@ public class BlockIndexStoreTest extends TestCase { File file = getPartFile(); TableBlockInfo info = new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, - file.length()); + file.length(),(short)1); CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); AbsoluteTableIdentifier absoluteTableIdentifier = @@ -78,20 +93,20 @@ public class BlockIndexStoreTest extends TestCase { File file = getPartFile(); TableBlockInfo info = new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info1 = new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info2 = new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info3 = new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info4 = new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); @@ -133,31 +148,31 @@ public class BlockIndexStoreTest extends TestCase { File file = getPartFile(); TableBlockInfo info = new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info1 = new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info2 = new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info3 = new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info4 = new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info5 = new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" }, - file.length()); + file.length(),(short)1); TableBlockInfo info6 = new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); TableBlockInfo info7 = new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" }, - file.length()); + file.length(), (short)1); CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");