[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java new file mode 100644 index 000..fde4e55 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.scan.collector.ResultCollectorFactory; +import org.apache.carbondata.core.scan.collector.ScannedResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.scanner.BlockletScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.util.TaskMetricsMap; + +/** + * This abstract class provides a skeletal implementation of the + * Block iterator. + */ +public class DataBlockIterator extends CarbonIterator> { + + /** + * iterator which will be used to iterate over blocklets + */ + private BlockletIterator blockletIterator; + + /** + * result collector which will be used to aggregate the scanned result + */ + private ScannedResultCollector scannerResultAggregator; + + /** + * processor which will be used to process the block processing can be + * filter processing or non filter processing + */ + private BlockletScanner blockletScanner; + + /** + * batch size of result + */ + private int batchSize; + + private ExecutorService executorService; + + private Future future; + + private Future futureIo; + + private BlockletScannedResult scannedResult; + + private BlockExecutionInfo blockExecutionInfo; + + private FileReader fileReader; + + private AtomicBoolean nextBlock; + + private AtomicBoolean nextRead; + + public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader fileReader, + int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) { +this.blockExecutionInfo = blockExecutionInfo; +this.fileReader = fileReader; +blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(), +blockExecutionInfo.getNumberOfBlockToScan()); +if (blockExecutionInfo.getFilterExecuterTree() != null) { + blockletScanner = new BlockletFilterScanner(blockExecutionInfo, queryStatisticsModel); +} else { + blockletScanner = new BlockletFullScanner(blockExecutionInfo, queryStatisticsModel); +} +this.scannerResultAggregator = +ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo); +this.batchSize = batchSize; +this.executorService = executorService; +this.nextBlock = new AtomicBoolean(false); +this.nextRead = new AtomicBoolean(false); + } + + @Override + public List next() { +List collectedResult = null; +if (updateScanner()) { + collectedResult = this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize); + while (collectedResult.size() < batchSize && updateScanner()) { +List data = this.scannerResultAggregator +.collectResultInRow(scannedResult, batchSize - collectedResul
[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java new file mode 100644 index 000..fde4e55 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.scan.collector.ResultCollectorFactory; +import org.apache.carbondata.core.scan.collector.ScannedResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.scanner.BlockletScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.util.TaskMetricsMap; + +/** + * This abstract class provides a skeletal implementation of the + * Block iterator. + */ +public class DataBlockIterator extends CarbonIterator> { + + /** + * iterator which will be used to iterate over blocklets + */ + private BlockletIterator blockletIterator; + + /** + * result collector which will be used to aggregate the scanned result + */ + private ScannedResultCollector scannerResultAggregator; + + /** + * processor which will be used to process the block processing can be + * filter processing or non filter processing + */ + private BlockletScanner blockletScanner; + + /** + * batch size of result + */ + private int batchSize; + + private ExecutorService executorService; + + private Future future; + + private Future futureIo; + + private BlockletScannedResult scannedResult; + + private BlockExecutionInfo blockExecutionInfo; + + private FileReader fileReader; + + private AtomicBoolean nextBlock; + + private AtomicBoolean nextRead; + + public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader fileReader, + int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) { +this.blockExecutionInfo = blockExecutionInfo; +this.fileReader = fileReader; +blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(), +blockExecutionInfo.getNumberOfBlockToScan()); +if (blockExecutionInfo.getFilterExecuterTree() != null) { + blockletScanner = new BlockletFilterScanner(blockExecutionInfo, queryStatisticsModel); +} else { + blockletScanner = new BlockletFullScanner(blockExecutionInfo, queryStatisticsModel); +} +this.scannerResultAggregator = +ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo); +this.batchSize = batchSize; +this.executorService = executorService; +this.nextBlock = new AtomicBoolean(false); +this.nextRead = new AtomicBoolean(false); + } + + @Override + public List next() { +List collectedResult = null; +if (updateScanner()) { + collectedResult = this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize); + while (collectedResult.size() < batchSize && updateScanner()) { +List data = this.scannerResultAggregator +.collectResultInRow(scannedResult, batchSize - collectedResul
[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java index 447ab46..547ecaa 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -35,12 +35,11 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo; -import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; import org.apache.carbondata.core.util.BitSetGroup; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; @@ -73,7 +72,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType()); } ifDefaultValueMatchesFilter(); -if (isDimensionPresentInCurrentBlock[0] == true) { +if (isDimensionPresentInCurrentBlock[0]) { isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex() && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn(); } @@ -120,11 +119,11 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut boolean isScanRequired = false; if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) { if (isMeasurePresentInCurrentBlock[0]) { -minValue = blockMinValue[measureBlocksIndex[0] + lastDimensionColOrdinal]; +minValue = blockMinValue[measureChunkIndex[0] + lastDimensionColOrdinal]; isScanRequired = isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType()); } else { -minValue = blockMinValue[dimensionBlocksIndex[0]]; +minValue = blockMinValue[dimensionChunkIndex[0]]; isScanRequired = isScanRequired(minValue, filterRangeValues); } } else { @@ -170,67 +169,69 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut } @Override - public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder, boolean useBitsetPipeLine) - throws FilterUnsupportedException, IOException { + public BitSetGroup applyFilter(RawBlockletColumnChunks rawBlockletColumnChunks, + boolean useBitsetPipeLine) throws IOException { // select all rows if dimension does not exists in the current block if (!isDimensionPresentInCurrentBlock[0] && !isMeasurePresentInCurrentBlock[0]) { - int numberOfRows = blockChunkHolder.getDataBlock().nodeSize(); + int numberOfRows = rawBlockletColumnChunks.getDataBlock().numRows(); return FilterUtil - .createBitSetGroupWithDefaultValue(blockChunkHolder.getDataBlock().numberOfPages(), + .createBitSetGroupWithDefaultValue(rawBlockletColumnChunks.getDataBlock().numberOfPages(), numberOfRows, true); } if (isDimensionPresentInCurrentBlock[0]) { - int blockIndex = - segmentProperties.getDimensionOrdinalToBlockMapping().get(dimensionBlocksIndex[0]); - if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) { -blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock() -.getDimensionC
[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java index 553f85e..773fbd7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java @@ -17,20 +17,15 @@ package org.apache.carbondata.core.scan.result.iterator; import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; public class PartitionSpliterRawResultIterator extends CarbonIterator { - private CarbonIterator iterator; - private BatchResult batch; + private CarbonIterator iterator; + private RowBatch batch; private int counter; - private static final LogService LOGGER = - LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName()); - - public PartitionSpliterRawResultIterator(CarbonIterator iterator) { + public PartitionSpliterRawResultIterator(CarbonIterator iterator) { this.iterator = iterator; } @@ -65,7 +60,7 @@ public class PartitionSpliterRawResultIterator extends CarbonIterator * @param batch * @return */ - private boolean checkBatchEnd(BatchResult batch) { + private boolean checkBatchEnd(RowBatch batch) { return !(counter < batch.getSize()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 70d0958..1dd1595 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -21,7 +21,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; /** @@ -37,7 +37,7 @@ public class RawResultIterator extends CarbonIterator { /** * Iterator of the Batch raw result. */ - private CarbonIterator detailRawQueryResultIterator; + private CarbonIterator detailRawQueryResultIterator; /** * Counter to maintain the row counter. @@ -55,9 +55,9 @@ public class RawResultIterator extends CarbonIterator { /** * batch of the result. */ - private BatchResult batch; + private RowBatch batch; - public RawResultIterator(CarbonIterator detailRawQueryResultIterator, + public RawResultIterator(CarbonIterator detailRawQueryResultIterator, SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; @@ -155,7 +155,7 @@ public class RawResultIterator extends CarbonIterator { * @param batch * @return */ - private boolean checkIfBatchIsProcessedCompletely(BatchResult batch) { + private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) { if (counter < batch.getSize()) { return false; } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java index cc9710e..c7cb00d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java @@ -35,10 +35,12 @@ public class VectorDetailQueryResultIterator ext