http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java deleted file mode 100644 index 29aa7e7..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.merger; - -import java.io.File; -import java.util.AbstractQueue; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.mutate.CarbonUpdateUtil; -import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; -import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; -import org.apache.carbondata.core.util.ByteUtil; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.processing.datatypes.GenericDataType; -import org.apache.carbondata.processing.model.CarbonLoadModel; -import org.apache.carbondata.processing.store.CarbonDataFileAttributes; -import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; -import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; -import org.apache.carbondata.processing.store.CarbonFactHandler; -import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; -import org.apache.carbondata.spark.merger.exeception.SliceMergerException; - -/** - * This is the Merger class responsible for the merging of the segments. - */ -public class RowResultMerger { - - private final String databaseName; - private final String tableName; - private final String tempStoreLocation; - private final String factStoreLocation; - private CarbonFactHandler dataHandler; - private List<RawResultIterator> rawResultIteratorList = - new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - private SegmentProperties segprop; - /** - * record holder heap - */ - private AbstractQueue<RawResultIterator> recordHolderHeap; - - private TupleConversionAdapter tupleConvertor; - - private static final LogService LOGGER = - LogServiceFactory.getLogService(RowResultMerger.class.getName()); - - public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName, - String tableName, SegmentProperties segProp, String tempStoreLocation, - CarbonLoadModel loadModel, CompactionType compactionType) { - - CarbonDataFileAttributes carbonDataFileAttributes; - - this.rawResultIteratorList = iteratorList; - // create the List of RawResultIterator. - - recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(), - new RowResultMerger.CarbonMdkeyComparator()); - - this.segprop = segProp; - this.tempStoreLocation = tempStoreLocation; - - this.factStoreLocation = loadModel.getStorePath(); - - if (!new File(tempStoreLocation).mkdirs()) { - LOGGER.error("Error while new File(tempStoreLocation).mkdirs() "); - } - - this.databaseName = databaseName; - this.tableName = tableName; - - int measureCount = segprop.getMeasures().size(); - CarbonTable carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); - CarbonFactDataHandlerModel carbonFactDataHandlerModel = - getCarbonFactDataHandlerModel(loadModel); - carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality()); - - if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) { - int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), - CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(), - carbonTable.getCarbonTableIdentifier())); - - // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will - // be written in same segment. So the TaskNo should be incremented by 1 from max val. - int index = taskNo + 1; - carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp()); - } - else { - carbonDataFileAttributes = - new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), - loadModel.getFactTimeStamp()); - } - - carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); - if (segProp.getNumberOfNoDictionaryDimension() > 0 - || segProp.getComplexDimensions().size() > 0) { - carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1); - } else { - carbonFactDataHandlerModel.setMdKeyIndex(measureCount); - } - carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); - dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); - - tupleConvertor = new TupleConversionAdapter(segProp); - } - - /** - * Merge function - * - */ - public boolean mergerSlice() { - boolean mergeStatus = false; - int index = 0; - boolean isDataPresent = false; - try { - - // add all iterators to the queue - for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) { - this.recordHolderHeap.add(leaftTupleIterator); - index++; - } - RawResultIterator iterator = null; - while (index > 1) { - // iterator the top record - iterator = this.recordHolderHeap.poll(); - Object[] convertedRow = iterator.next(); - if (null == convertedRow) { - index--; - continue; - } - if (!isDataPresent) { - dataHandler.initialise(); - isDataPresent = true; - } - // get the mdkey - addRow(convertedRow); - // if there is no record in the leaf and all then decrement the - // index - if (!iterator.hasNext()) { - index--; - continue; - } - // add record to heap - this.recordHolderHeap.add(iterator); - } - // if record holder is not empty then iterator the slice holder from - // heap - iterator = this.recordHolderHeap.poll(); - while (true) { - Object[] convertedRow = iterator.next(); - if (null == convertedRow) { - break; - } - // do it only once - if (!isDataPresent) { - dataHandler.initialise(); - isDataPresent = true; - } - addRow(convertedRow); - // check if leaf contains no record - if (!iterator.hasNext()) { - break; - } - } - if (isDataPresent) - { - this.dataHandler.finish(); - } - mergeStatus = true; - } catch (Exception e) { - LOGGER.error(e, e.getMessage()); - LOGGER.error("Exception in compaction merger " + e.getMessage()); - mergeStatus = false; - } finally { - try { - if (isDataPresent) { - this.dataHandler.closeHandler(); - } - } catch (CarbonDataWriterException e) { - LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage()); - mergeStatus = false; - } - } - - return mergeStatus; - } - - /** - * Below method will be used to add sorted row - * - * @throws SliceMergerException - */ - private void addRow(Object[] carbonTuple) throws SliceMergerException { - Object[] rowInWritableFormat; - - rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple); - try { - this.dataHandler.addDataToStore(rowInWritableFormat); - } catch (CarbonDataWriterException e) { - throw new SliceMergerException("Problem in merging the slice", e); - } - } - - /** - * This method will create a model object for carbon fact data handler - * - * @param loadModel - * @return - */ - private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) { - CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); - carbonFactDataHandlerModel.setDatabaseName(databaseName); - carbonFactDataHandlerModel.setTableName(tableName); - carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size()); - carbonFactDataHandlerModel.setCompactionFlow(true); - carbonFactDataHandlerModel - .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes()); - carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation); - carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality()); - carbonFactDataHandlerModel.setSegmentProperties(segprop); - carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension()); - carbonFactDataHandlerModel.setDimensionCount( - segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount()); - CarbonTable carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); - List<ColumnSchema> wrapperColumnSchema = CarbonUtil - .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName), - carbonTable.getMeasureByTableName(tableName)); - carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema); - // get the cardinality for all all the columns including no dictionary columns - int[] formattedCardinality = - CarbonUtil.getFormattedCardinality(segprop.getDimColumnsCardinality(), wrapperColumnSchema); - carbonFactDataHandlerModel.setColCardinality(formattedCardinality); - //TO-DO Need to handle complex types here . - Map<Integer, GenericDataType> complexIndexMap = - new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size()); - carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap); - carbonFactDataHandlerModel.setDataWritingRequest(true); - - char[] aggType = new char[segprop.getMeasures().size()]; - Arrays.fill(aggType, 'n'); - int i = 0; - for (CarbonMeasure msr : segprop.getMeasures()) { - aggType[i++] = DataTypeUtil.getAggType(msr.getDataType()); - } - carbonFactDataHandlerModel.setAggType(aggType); - carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality()); - - String carbonDataDirectoryPath = - checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName, - loadModel.getPartitionId(), loadModel.getSegmentId()); - carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); - - List<CarbonDimension> dimensionByTableName = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName); - boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; - int index = 0; - for (CarbonDimension dimension : dimensionByTableName) { - isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex(); - } - carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes); - return carbonFactDataHandlerModel; - } - - /** - * This method will get the store location for the given path, segment id and partition id - * - * @return data directory path - */ - private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName, - String tableName, String partitionId, String segmentId) { - CarbonTable carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); - String carbonDataDirectoryPath = - carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId); - CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); - return carbonDataDirectoryPath; - } - - /** - * Comparator class for comparing 2 raw row result. - */ - private class CarbonMdkeyComparator implements Comparator<RawResultIterator> { - - @Override public int compare(RawResultIterator o1, RawResultIterator o2) { - - Object[] row1 = new Object[0]; - Object[] row2 = new Object[0]; - try { - row1 = o1.fetchConverted(); - row2 = o2.fetchConverted(); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - } - if (null == row1 || null == row2) { - return 0; - } - ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0]; - ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0]; - int compareResult = 0; - int[] columnValueSizes = segprop.getEachDimColumnValueSize(); - int dictionaryKeyOffset = 0; - byte[] dimCols1 = key1.getDictionaryKey(); - byte[] dimCols2 = key2.getDictionaryKey(); - int noDicIndex = 0; - for (int eachColumnValueSize : columnValueSizes) { - // case of dictionary cols - if (eachColumnValueSize > 0) { - - compareResult = ByteUtil.UnsafeComparer.INSTANCE - .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2, - dictionaryKeyOffset, eachColumnValueSize); - dictionaryKeyOffset += eachColumnValueSize; - } else { // case of no dictionary - - byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex); - byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex); - compareResult = - ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2); - noDicIndex++; - - } - if (0 != compareResult) { - return compareResult; - } - } - return 0; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java deleted file mode 100644 index d1a3a8d..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.merger; - -import java.io.Serializable; - -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; - -public class TableMeta implements Serializable { - - private static final long serialVersionUID = -1749874611119829431L; - - public CarbonTableIdentifier carbonTableIdentifier; - public String storePath; - public CarbonTable carbonTable; - - public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath, - CarbonTable carbonTable) { - this.carbonTableIdentifier = carbonTableIdentifier; - this.storePath = storePath; - this.carbonTable = carbonTable; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java deleted file mode 100644 index 08b563f..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.merger; - -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; - -/** - * This class will be used to convert the Result into the format used in data writer. - */ -class TupleConversionAdapter { - - private final SegmentProperties segmentproperties; - - private int noDictionaryPresentIndex; - - private int measureCount; - - private boolean isNoDictionaryPresent; - - public TupleConversionAdapter(SegmentProperties segmentProperties) { - this.measureCount = segmentProperties.getMeasures().size(); - this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0; - if (isNoDictionaryPresent) { - noDictionaryPresentIndex++; - } - this.segmentproperties = segmentProperties; - } - - /** - * Converting the raw result to the format understandable by the data writer. - * @param carbonTuple - * @return - */ - public Object[] getObjectArray(Object[] carbonTuple) { - Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1]; - int index = 0; - // put measures. - - for (int j = 1; j <= measureCount; j++) { - row[index++] = carbonTuple[j]; - } - - // put No dictionary byte [] - if (isNoDictionaryPresent) { - row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeys(); - } - - // put No Dictionary Dims - row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey(); - return row; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java deleted file mode 100644 index fd6610c..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.merger.exeception; - -import java.util.Locale; - -public class SliceMergerException extends Exception { - - /** - * default serial version ID. - */ - private static final long serialVersionUID = 1L; - - /** - * The Error message. - */ - private String msg = ""; - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public SliceMergerException(String msg) { - super(msg); - this.msg = msg; - } - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public SliceMergerException(String msg, Throwable t) { - super(msg, t); - this.msg = msg; - } - - /** - * This method is used to get the localized message. - * - * @param locale - A Locale object represents a specific geographical, - * political, or cultural region. - * @return - Localized error message. - */ - public String getLocalizedMessage(Locale locale) { - return ""; - } - - /** - * getLocalizedMessage - */ - @Override public String getLocalizedMessage() { - return super.getLocalizedMessage(); - } - - /** - * getMessage - */ - public String getMessage() { - return this.msg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index f04669c..277005b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -31,9 +31,9 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.merger.CarbonDataMergerUtil import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.MergeResult -import org.apache.carbondata.spark.merger.CarbonDataMergerUtil /** * IUD carbon merger RDD http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 51f9022..350a2ec 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -46,11 +46,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} +import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonDataProcessorUtil import org.apache.carbondata.spark.MergeResult import org.apache.carbondata.spark.load.CarbonLoaderUtil -import org.apache.carbondata.spark.merger._ import org.apache.carbondata.spark.splits.TableSplit class CarbonMergerRDD[K, V]( @@ -152,9 +152,14 @@ class CarbonMergerRDD[K, V]( CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList) carbonLoadModel.setStorePath(hdfsStoreLocation) - + // check for restructured block + // TODO: only in case of add and drop this variable should be true + val restructuredBlockExists: Boolean = CarbonCompactionUtil + .checkIfAnyRestructuredBlockExists(segmentMapping, + dataFileMetadataSegMapping, + carbonTable.getTableLastUpdatedTime) exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, - carbonTable, dataFileMetadataSegMapping) + carbonTable, dataFileMetadataSegMapping, restructuredBlockExists) // fire a query and get the results. var result2: java.util.List[RawResultIterator] = null @@ -190,17 +195,25 @@ class CarbonMergerRDD[K, V]( carbonLoadModel.setSegmentId(mergeNumber) carbonLoadModel.setPartitionId("0") - val merger = - new RowResultMerger(result2, - databaseName, - factTableName, + var processor: AbstractResultProcessor = null + if (restructuredBlockExists) { + processor = new CompactionResultSortProcessor(carbonLoadModel, carbonTable, segmentProperties, - tempStoreLoc, - carbonLoadModel, - carbonMergerMapping.campactionType + carbonMergerMapping.campactionType, + factTableName ) - mergeStatus = merger.mergerSlice() - + } else { + processor = + new RowResultMergerProcessor( + databaseName, + factTableName, + segmentProperties, + tempStoreLoc, + carbonLoadModel, + carbonMergerMapping.campactionType + ) + } + mergeStatus = processor.execute(result2) mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber } catch { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index 3b38028..1a237f6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -24,9 +24,8 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl -import org.apache.carbondata.spark.load.CarbonLoaderUtil -import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionType} /** * Compactor class which handled the compaction cases. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index 0ba99a8..d6cc2e6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -33,10 +33,11 @@ import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifie import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.spark._ +import org.apache.carbondata.spark.compaction.CompactionCallable import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType} import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil} /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index dadd03e..367bf46 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -39,10 +39,10 @@ import org.apache.carbondata.core.service.CarbonCommonFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.merger.CompactionType import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.CarbonSparkFactory import org.apache.carbondata.spark.load.FailureCauses -import org.apache.carbondata.spark.merger.CompactionType import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 60742ac..4cca0a3 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -50,11 +50,11 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.csvload.BlockDetails import org.apache.carbondata.processing.etl.DataLoadingException +import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException import org.apache.carbondata.spark._ import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.splits.TableSplit import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index e322fc8..0e6153f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -40,8 +40,8 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection} import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader} +import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.{CarbonFilters, CarbonOption} -import org.apache.carbondata.spark.merger.TableMeta import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl private[sql] case class CarbonDatasourceHadoopRelation( http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala index 4c5e733..2ff21c8e 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala @@ -34,8 +34,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.{CarbonOption, _} -import org.apache.carbondata.spark.merger.TableMeta /** * Carbon relation provider compliant to data source api. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index 62e5241..a439c30 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -43,10 +43,9 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl import org.apache.carbondata.processing.exception.MultipleMatchingException +import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} import org.apache.carbondata.spark.DeleteDelataResultImpl import org.apache.carbondata.spark.load.FailureCauses -import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} -import org.apache.carbondata.spark.merger.CarbonDataMergerUtil._ import org.apache.carbondata.spark.util.QueryPlanUtil @@ -272,7 +271,7 @@ object IUDCommon { carbonRelation: CarbonRelation, isUpdateOperation: Boolean): Unit = { - var ishorizontalCompaction = isHorizontalCompactionEnabled() + var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled() if (ishorizontalCompaction == false) { return @@ -288,7 +287,7 @@ object IUDCommon { val deleteTimeStamp = updateTimeStamp + 1 // get the valid segments - var segLists = getValidSegmentList(absTableIdentifier) + var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) if (segLists == null || segLists.size() == 0) { return @@ -350,7 +349,7 @@ object IUDCommon { val db = carbonTable.getDatabaseName val table = carbonTable.getFactTableName // get the valid segments qualified for update compaction. - val validSegList = getSegListIUDCompactionQualified(segLists, + val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists, absTableIdentifier, segmentUpdateStatusManager, compactionTypeIUD) @@ -406,7 +405,7 @@ object IUDCommon { val db = carbonTable.getDatabaseName val table = carbonTable.getFactTableName - val deletedBlocksList = getSegListIUDCompactionQualified(segLists, + val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists, absTableIdentifier, segmentUpdateStatusManager, compactionTypeIUD) @@ -436,7 +435,7 @@ object IUDCommon { val blockName = segmentAndBlocks .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length) - val result = compactBlockDeleteDeltaFiles(segment, blockName, + val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName, absTableIdentifier, updateStatusDetails, timestamp) @@ -453,7 +452,7 @@ object IUDCommon { }) }) - val updateStatus = updateStatusFile(resultList.toList.asJava, + val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava, carbonTable, timestamp.toString, segmentUpdateStatusManager) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index d5cc540..e8d3907 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -47,7 +47,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.merger.TableMeta +import org.apache.carbondata.processing.merger.TableMeta case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 7cb5ed4..4f33043 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -50,11 +50,11 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.csvload.BlockDetails import org.apache.carbondata.processing.etl.DataLoadingException +import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException import org.apache.carbondata.spark._ import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.splits.TableSplit import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 4169ac3..b9e8682 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -32,8 +32,8 @@ import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.logical.AndExpression import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.hadoop.util.SchemaReader +import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.CarbonFilters -import org.apache.carbondata.spark.merger.TableMeta import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonSparkUtil http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index 88ca4af..38fdb11 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -50,8 +50,9 @@ private[sql] case class AlterTableAddColumns( val dbName = alterTableAddColumnsModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName") - val carbonLock = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + val locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) // get the latest carbon table and check for column existence val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() @@ -102,13 +103,7 @@ private[sql] case class AlterTableAddColumns( sys.error("Alter table add column operation failed. Please check the logs") } finally { // release lock after command execution completion - if (carbonLock != null) { - if (carbonLock.unlock()) { - LOGGER.info("Alter table add columns lock released successfully") - } else { - LOGGER.error("Unable to release lock during alter table add columns operation") - } - } + AlterTableUtil.releaseLocks(locks, LOGGER) } Seq.empty } @@ -147,15 +142,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR s"Table $oldDatabaseName.$oldTableName does not exist") sys.error(s"Table $oldDatabaseName.$oldTableName does not exist") } + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.DROP_TABLE_LOCK) + val locks = AlterTableUtil + .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)( + sparkSession) val carbonTable = relation.tableMeta.carbonTable - val carbonLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK) - if (carbonLock.lockWithRetries()) { - LOGGER.info("Successfully able to get the table metadata file lock") - } else { - sys.error("Table is locked for updation. Please try after some time") - } try { // get the latest carbon table and check for column existence val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, @@ -200,24 +195,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR sys.error("Alter table rename table operation failed. Please check the logs") } finally { // release lock after command execution completion - if (carbonLock != null) { - if (carbonLock.unlock()) { - LOGGER.info("Lock released successfully") - } else { - val storeLocation = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.STORE_LOCATION, - System.getProperty("java.io.tmpdir")) - val lockFilePath = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + - oldDatabaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName + - CarbonCommonConstants.FILE_SEPARATOR + - LockUsage.METADATA_LOCK - if(carbonLock.releaseLockManually(lockFilePath)) { - LOGGER.info("Lock released successfully") - } else { - LOGGER.error("Unable to release lock during rename table") - } - } - } + AlterTableUtil.releaseLocks(locks, LOGGER) + // case specific to rename table as after table rename old table path will not be found + AlterTableUtil + .releaseLocksManually(locks, + locksToBeAcquired, + oldDatabaseName, + newTableName, + carbonTable.getStorePath, + LOGGER) } Seq.empty } @@ -251,8 +237,9 @@ private[sql] case class AlterTableDropColumns( val dbName = alterTableDropColumnModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName") - val carbonLock = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + val locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) try { // get the latest carbon table and check for column existence val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) @@ -333,13 +320,7 @@ private[sql] case class AlterTableDropColumns( sys.error("Alter table drop column operation failed. Please check the logs") } finally { // release lock after command execution completion - if (carbonLock != null) { - if (carbonLock.unlock()) { - LOGGER.info("Alter table drop columns lock released successfully") - } else { - LOGGER.error("Unable to release lock during alter table drop columns operation") - } - } + AlterTableUtil.releaseLocks(locks, LOGGER) } Seq.empty } @@ -355,8 +336,9 @@ private[sql] case class AlterTableDataTypeChange( val dbName = alterTableDataTypeChangeModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName") - val carbonLock = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + val locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) try { // get the latest carbon table and check for column existence val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) @@ -416,13 +398,7 @@ private[sql] case class AlterTableDataTypeChange( sys.error("Alter table data type change operation failed. Please check the logs") } finally { // release lock after command execution completion - if (carbonLock != null) { - if (carbonLock.unlock()) { - LOGGER.info("Alter table change data type lock released successfully") - } else { - LOGGER.error("Unable to release lock during alter table change data type operation") - } - } + AlterTableUtil.releaseLocks(locks, LOGGER) } Seq.empty } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index f7ea344..6460490 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -53,7 +53,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.merger.TableMeta +import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.util.CarbonSparkUtil case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 243eeb6..2e7eebf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.collection.mutable.ListBuffer + import org.apache.spark.SparkConf import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -27,12 +29,15 @@ import org.apache.carbondata.common.logging.LogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} object AlterTableUtil { - - def validateTableAndAcquireLock(dbName: String, tableName: String, LOGGER: LogService) - (sparkSession: SparkSession): ICarbonLock = { + def validateTableAndAcquireLock(dbName: String, + tableName: String, + locksToBeAcquired: List[String], + LOGGER: LogService) + (sparkSession: SparkSession): List[ICarbonLock] = { val relation = CarbonEnv.get.carbonMetastore .lookupRelation(Option(dbName), tableName)(sparkSession) @@ -44,17 +49,82 @@ object AlterTableUtil { } // acquire the lock first val table = relation.tableMeta.carbonTable + var acquiredLocks = ListBuffer[ICarbonLock]() + locksToBeAcquired.foreach { lock => + acquiredLocks += getLockObject(table, lock, LOGGER) + } + acquiredLocks.toList + } + + /** + * Given a lock type this method will return a new lock object if not acquired by any other + * operation + * + * @param carbonTable + * @param lockType + * @param LOGGER + * @return + */ + private def getLockObject(carbonTable: CarbonTable, + lockType: String, + LOGGER: LogService): ICarbonLock = { val carbonLock = CarbonLockFactory - .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK) + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + lockType) if (carbonLock.lockWithRetries()) { - LOGGER.info("Successfully able to get the table metadata file lock") + LOGGER.info(s"Successfully acquired the lock $lockType") } else { sys.error("Table is locked for updation. Please try after some time") } carbonLock } + /** + * This method will release the locks acquired for an operation + * + * @param locks + * @param LOGGER + */ + def releaseLocks(locks: List[ICarbonLock], LOGGER: LogService): Unit = { + locks.foreach { carbonLock => + if (carbonLock.unlock()) { + LOGGER.info("Alter table lock released successfully") + } else { + LOGGER.error("Unable to release lock during alter table operation") + } + } + } + + /** + * This method will release the locks by manually forming a lock path. Specific usage for + * rename table + * + * @param locks + * @param locksAcquired + * @param dbName + * @param tableName + * @param storeLocation + * @param LOGGER + */ + def releaseLocksManually(locks: List[ICarbonLock], + locksAcquired: List[String], + dbName: String, + tableName: String, + storeLocation: String, + LOGGER: LogService): Unit = { + val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + + dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName + locks.zip(locksAcquired).foreach { case (carbonLock, lockType) => + val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR + + lockType + if (carbonLock.releaseLockManually(lockFilePath)) { + LOGGER.info(s"Alter table lock released successfully: ${ lockType }") + } else { + LOGGER.error("Unable to release lock during alter table operation") + } + } + } + def updateSchemaInfo(carbonTable: CarbonTable, schemaEvolutionEntry: SchemaEvolutionEntry, thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala index 6ca8449..0d85062 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableModel} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.spark.merger.CompactionType +import org.apache.carbondata.processing.merger.CompactionType /** * table compaction api http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala index 914136c..91dd8b3 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala @@ -382,7 +382,7 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll { } test("test to check if the lock file is successfully deleted") { - sql("create table lock_check(id int, name string) stored by 'carbondata'") + sql("create table lock_check(id int, name string) stored by 'carbondata'") sql("alter table lock_check rename to lock_rename") assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /default/lock_rename/meta.lock") .exists()) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java new file mode 100644 index 0000000..f76c66f --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.merger; + +import java.util.List; + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.store.CarbonDataFileAttributes; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; + +/** + * This class contains the common methods required for result processing during compaction based on + * restructure and normal scenarios + */ +public abstract class AbstractResultProcessor { + + /** + * This method will perform the desired tasks of merging the selected slices + * + * @param resultIteratorList + * @return + */ + public abstract boolean execute(List<RawResultIterator> resultIteratorList); + + protected void setDataFileAttributesInModel(CarbonLoadModel loadModel, + CompactionType compactionType, CarbonTable carbonTable, + CarbonFactDataHandlerModel carbonFactDataHandlerModel) { + CarbonDataFileAttributes carbonDataFileAttributes; + if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) { + int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), + CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(), + carbonTable.getCarbonTableIdentifier())); + // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will + // be written in same segment. So the TaskNo should be incremented by 1 from max val. + int index = taskNo + 1; + carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp()); + } else { + carbonDataFileAttributes = + new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), + loadModel.getFactTimeStamp()); + } + carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java new file mode 100644 index 0000000..c00fe2e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.block.TaskBlockInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.util.CarbonUtil; + +/** + * Executor class for executing the query on the selected segments to be merged. + * This will fire a select * query and get the raw result. + */ +public class CarbonCompactionExecutor { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName()); + private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping; + private final SegmentProperties destinationSegProperties; + private final Map<String, TaskBlockInfo> segmentMapping; + private QueryExecutor queryExecutor; + private CarbonTable carbonTable; + private QueryModel queryModel; + + /** + * flag to check whether any restructured block exists in the blocks selected for compaction. + * Based on this decision will be taken whether complete data has to be sorted again + */ + private boolean restructuredBlockExists; + + /** + * Constructor + * + * @param segmentMapping + * @param segmentProperties + * @param carbonTable + * @param dataFileMetadataSegMapping + * @param restructuredBlockExists + */ + public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, + SegmentProperties segmentProperties, CarbonTable carbonTable, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, + boolean restructuredBlockExists) { + this.segmentMapping = segmentMapping; + this.destinationSegProperties = segmentProperties; + this.carbonTable = carbonTable; + this.dataFileMetadataSegMapping = dataFileMetadataSegMapping; + this.restructuredBlockExists = restructuredBlockExists; + } + + /** + * For processing of the table blocks. + * + * @return List of Carbon iterators + */ + public List<RawResultIterator> processTableBlocks() throws QueryExecutionException, IOException { + List<RawResultIterator> resultList = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<TableBlockInfo> list = null; + queryModel = prepareQueryModel(list); + // iterate each seg ID + for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { + String segmentId = taskMap.getKey(); + List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); + SegmentProperties sourceSegProperties = getSourceSegmentProperties(listMetadata); + // for each segment get taskblock info + TaskBlockInfo taskBlockInfo = taskMap.getValue(); + Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet(); + for (String task : taskBlockListMapping) { + list = taskBlockInfo.getTableBlockInfoList(task); + Collections.sort(list); + LOGGER.info("for task -" + task + "-block size is -" + list.size()); + queryModel.setTableBlockInfos(list); + resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties, + destinationSegProperties)); + } + } + return resultList; + } + + /** + * This method will create the source segment properties based on restructured block existence + * + * @param listMetadata + * @return + */ + private SegmentProperties getSourceSegmentProperties(List<DataFileFooter> listMetadata) { + SegmentProperties sourceSegProperties = null; + if (restructuredBlockExists) { + // update cardinality of source segment according to new schema + Map<String, Integer> columnToCardinalityMap = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + CarbonCompactionUtil + .addColumnCardinalityToMap(columnToCardinalityMap, listMetadata.get(0).getColumnInTable(), + listMetadata.get(0).getSegmentInfo().getColumnCardinality()); + List<ColumnSchema> updatedColumnSchemaList = + new ArrayList<>(listMetadata.get(0).getColumnInTable().size()); + int[] updatedColumnCardinalities = CarbonCompactionUtil + .updateColumnSchemaAndGetCardinality(columnToCardinalityMap, carbonTable, + updatedColumnSchemaList); + sourceSegProperties = + new SegmentProperties(updatedColumnSchemaList, updatedColumnCardinalities); + } else { + sourceSegProperties = new SegmentProperties(listMetadata.get(0).getColumnInTable(), + listMetadata.get(0).getSegmentInfo().getColumnCardinality()); + } + return sourceSegProperties; + } + + /** + * get executor and execute the query model. + * + * @param blockList + * @return + */ + private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + throws QueryExecutionException, IOException { + queryModel.setTableBlockInfos(blockList); + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + return queryExecutor.execute(queryModel); + } + + /** + * Below method will be used + * for cleanup + */ + public void finish() { + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + LOGGER.error(e, "Problem while finish: "); + } + clearDictionaryFromQueryModel(); + } + + /** + * This method will clear the dictionary access count after its usage is complete so + * that column can be deleted form LRU cache whenever memory reaches threshold + */ + private void clearDictionaryFromQueryModel() { + if (null != queryModel) { + Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); + if (null != columnToDictionaryMapping) { + for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { + CarbonUtil.clearDictionaryCache(entry.getValue()); + } + } + } + } + + /** + * Preparing of the query model. + * + * @param blockList + * @return + */ + private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { + QueryModel model = new QueryModel(); + model.setTableBlockInfos(blockList); + model.setForcedDetailRawQuery(true); + model.setFilterExpressionResolverTree(null); + + List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + for (CarbonDimension dim : dimensions) { + // check if dimension is deleted + QueryDimension queryDimension = new QueryDimension(dim.getColName()); + queryDimension.setDimension(dim); + dims.add(queryDimension); + } + model.setQueryDimension(dims); + + List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + for (CarbonMeasure carbonMeasure : measures) { + // check if measure is deleted + QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); + queryMeasure.setMeasure(carbonMeasure); + msrs.add(queryMeasure); + } + model.setQueryMeasures(msrs); + model.setQueryId(System.nanoTime() + ""); + model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); + model.setTable(carbonTable); + return model; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java new file mode 100644 index 0000000..2ad83a4 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.block.TaskBlockInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.commons.lang3.ArrayUtils; + +/** + * Utility Class for the Compaction Flow. + */ +public class CarbonCompactionUtil { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName()); + + /** + * To create a mapping of Segment Id and TableBlockInfo. + * + * @param tableBlockInfoList + * @return + */ + public static Map<String, TaskBlockInfo> createMappingForSegments( + List<TableBlockInfo> tableBlockInfoList) { + + // stores taskBlockInfo of each segment + Map<String, TaskBlockInfo> segmentBlockInfoMapping = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + + for (TableBlockInfo info : tableBlockInfoList) { + String segId = info.getSegmentId(); + // check if segId is already present in map + TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId); + // extract task ID from file Path. + String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath()); + // if taskBlockInfo is not there, then create and add + if (null == taskBlockInfoMapping) { + taskBlockInfoMapping = new TaskBlockInfo(); + groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo); + // put the taskBlockInfo with respective segment id + segmentBlockInfoMapping.put(segId, taskBlockInfoMapping); + } else + { + groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo); + } + } + return segmentBlockInfoMapping; + + } + + /** + * Grouping the taskNumber and list of TableBlockInfo. + * @param info + * @param taskBlockMapping + * @param taskNo + */ + private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info, + TaskBlockInfo taskBlockMapping, String taskNo) { + // get the corresponding list from task mapping. + List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo); + if (null != blockLists) { + blockLists.add(info); + } else { + blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + blockLists.add(info); + taskBlockMapping.addTableBlockInfoList(taskNo, blockLists); + } + } + + /** + * To create a mapping of Segment Id and DataFileFooter. + * + * @param tableBlockInfoList + * @return + */ + public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments( + List<TableBlockInfo> tableBlockInfoList) throws IOException { + + Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>(); + for (TableBlockInfo blockInfo : tableBlockInfoList) { + List<DataFileFooter> eachSegmentBlocks = new ArrayList<>(); + String segId = blockInfo.getSegmentId(); + DataFileFooter dataFileMatadata = null; + // check if segId is already present in map + List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId); + dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo); + if (null == metadataList) { + // if it is not present + eachSegmentBlocks.add(dataFileMatadata); + segmentBlockInfoMapping.put(segId, eachSegmentBlocks); + } else { + // if its already present then update the list. + metadataList.add(dataFileMatadata); + } + } + return segmentBlockInfoMapping; + + } + + /** + * Check whether the file to indicate the compaction is present or not. + * @param metaFolderPath + * @return + */ + public static boolean isCompactionRequiredForTable(String metaFolderPath) { + String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.minorCompactionRequiredFile; + + String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.majorCompactionRequiredFile; + try { + if (FileFactory.isFileExist(minorCompactionStatusFile, + FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory + .isFileExist(majorCompactionStatusFile, + FileFactory.getFileType(majorCompactionStatusFile))) { + return true; + } + } catch (IOException e) { + LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage()); + } + return false; + } + + /** + * Determine the type of the compaction received. + * @param metaFolderPath + * @return + */ + public static CompactionType determineCompactionType(String metaFolderPath) { + String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.minorCompactionRequiredFile; + + String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.majorCompactionRequiredFile; + try { + if (FileFactory.isFileExist(minorCompactionStatusFile, + FileFactory.getFileType(minorCompactionStatusFile))) { + return CompactionType.MINOR_COMPACTION; + } + if (FileFactory.isFileExist(majorCompactionStatusFile, + FileFactory.getFileType(majorCompactionStatusFile))) { + return CompactionType.MAJOR_COMPACTION; + } + + } catch (IOException e) { + LOGGER.error("Exception in determining the compaction request file " + e.getMessage()); + } + return CompactionType.MINOR_COMPACTION; + } + + /** + * Delete the compation request file once the compaction is done. + * @param metaFolderPath + * @param compactionType + * @return + */ + public static boolean deleteCompactionRequiredFile(String metaFolderPath, + CompactionType compactionType) { + String compactionRequiredFile; + if (compactionType.equals(CompactionType.MINOR_COMPACTION)) { + compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.minorCompactionRequiredFile; + } else { + compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.majorCompactionRequiredFile; + } + try { + if (FileFactory + .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) { + if (FileFactory + .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile)) + .delete()) { + LOGGER.info("Deleted the compaction request file " + compactionRequiredFile); + return true; + } else { + LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile); + } + } else { + LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile); + } + } catch (IOException e) { + LOGGER.error("Exception in deleting the compaction request file " + e.getMessage()); + } + return false; + } + + /** + * Creation of the compaction request if someother compaction is in progress. + * @param metaFolderPath + * @param compactionType + * @return + */ + public static boolean createCompactionRequiredFile(String metaFolderPath, + CompactionType compactionType) { + String statusFile; + if (compactionType.equals(CompactionType.MINOR_COMPACTION)) { + statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.minorCompactionRequiredFile; + } else { + statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.majorCompactionRequiredFile; + } + try { + if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) { + if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) { + LOGGER.info("successfully created a compaction required file - " + statusFile); + return true; + } else { + LOGGER.error("Not able to create a compaction required file - " + statusFile); + return false; + } + } else { + LOGGER.info("Compaction request file : " + statusFile + " already exist."); + } + } catch (IOException e) { + LOGGER.error("Exception in creating the compaction request file " + e.getMessage()); + } + return false; + } + + /** + * This will check if any compaction request has been received for any table. + * + * @param tableMetas + * @return + */ + public static TableMeta getNextTableToCompact(TableMeta[] tableMetas, + List<CarbonTableIdentifier> skipList) { + for (TableMeta table : tableMetas) { + CarbonTable ctable = table.carbonTable; + String metadataPath = ctable.getMetaDataFilepath(); + // check for the compaction required file and at the same time exclude the tables which are + // present in the skip list. + if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList + .contains(table.carbonTableIdentifier)) { + return table; + } + } + return null; + } + + /** + * This method will add the prepare the max column cardinality map + * + * @param columnCardinalityMap + * @param currentBlockSchema + * @param currentBlockCardinality + */ + public static void addColumnCardinalityToMap(Map<String, Integer> columnCardinalityMap, + List<ColumnSchema> currentBlockSchema, int[] currentBlockCardinality) { + for (int i = 0; i < currentBlockCardinality.length; i++) { + // add value to map only if does not exist or new cardinality is > existing value + String columnUniqueId = currentBlockSchema.get(i).getColumnUniqueId(); + Integer value = columnCardinalityMap.get(columnUniqueId); + if (null == value) { + columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]); + } else { + if (currentBlockCardinality[i] > value) { + columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]); + } + } + } + } + + /** + * This method will return the updated cardinality according to the master schema + * + * @param columnCardinalityMap + * @param carbonTable + * @param updatedColumnSchemaList + * @return + */ + public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap, + CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) { + List<CarbonDimension> masterDimensions = + carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size()); + for (CarbonDimension dimension : masterDimensions) { + Integer value = columnCardinalityMap.get(dimension.getColumnId()); + if (null == value) { + updatedCardinalityList.add(getDimensionDefaultCardinality(dimension)); + } else { + updatedCardinalityList.add(value); + } + updatedColumnSchemaList.add(dimension.getColumnSchema()); + } + // add measures to the column schema list + List<CarbonMeasure> masterSchemaMeasures = + carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + for (CarbonMeasure measure : masterSchemaMeasures) { + updatedColumnSchemaList.add(measure.getColumnSchema()); + } + int[] updatedCardinality = ArrayUtils + .toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()])); + return updatedCardinality; + } + + /** + * This method will return the default cardinality based on dimension type + * + * @param dimension + * @return + */ + private static int getDimensionDefaultCardinality(CarbonDimension dimension) { + int cardinality = 0; + if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + cardinality = Integer.MAX_VALUE; + } else if (dimension.hasEncoding(Encoding.DICTIONARY)) { + if (null != dimension.getDefaultValue()) { + cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1; + } else { + cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY; + } + } else { + cardinality = -1; + } + return cardinality; + } + + /** + * This method will check for any restructured block in the blocks selected for compaction + * + * @param segmentMapping + * @param dataFileMetadataSegMapping + * @param tableLastUpdatedTime + * @return + */ + public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { + boolean restructuredBlockExists = false; + for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { + String segmentId = taskMap.getKey(); + List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); + for (DataFileFooter dataFileFooter : listMetadata) { + // if schema modified timestamp is greater than footer stored schema timestamp, + // it indicates it is a restructured block + if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) { + restructuredBlockExists = true; + break; + } + } + if (restructuredBlockExists) { + break; + } + } + return restructuredBlockExists; + } +}