[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability

2018-03-04 Thread jackylk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 000..fde4e55
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator> {
+
+  /**
+   * iterator which will be used to iterate over blocklets
+   */
+  private BlockletIterator blockletIterator;
+
+  /**
+   * result collector which will be used to aggregate the scanned result
+   */
+  private ScannedResultCollector scannerResultAggregator;
+
+  /**
+   * processor which will be used to process the block processing can be
+   * filter processing or non filter processing
+   */
+  private BlockletScanner blockletScanner;
+
+  /**
+   * batch size of result
+   */
+  private int batchSize;
+
+  private ExecutorService executorService;
+
+  private Future future;
+
+  private Future futureIo;
+
+  private BlockletScannedResult scannedResult;
+
+  private BlockExecutionInfo blockExecutionInfo;
+
+  private FileReader fileReader;
+
+  private AtomicBoolean nextBlock;
+
+  private AtomicBoolean nextRead;
+
+  public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader 
fileReader,
+  int batchSize, QueryStatisticsModel queryStatisticsModel, 
ExecutorService executorService) {
+this.blockExecutionInfo = blockExecutionInfo;
+this.fileReader = fileReader;
+blockletIterator = new 
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+blockExecutionInfo.getNumberOfBlockToScan());
+if (blockExecutionInfo.getFilterExecuterTree() != null) {
+  blockletScanner = new BlockletFilterScanner(blockExecutionInfo, 
queryStatisticsModel);
+} else {
+  blockletScanner = new BlockletFullScanner(blockExecutionInfo, 
queryStatisticsModel);
+}
+this.scannerResultAggregator =
+ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+this.batchSize = batchSize;
+this.executorService = executorService;
+this.nextBlock = new AtomicBoolean(false);
+this.nextRead = new AtomicBoolean(false);
+  }
+
+  @Override
+  public List next() {
+List collectedResult = null;
+if (updateScanner()) {
+  collectedResult = 
this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize);
+  while (collectedResult.size() < batchSize && updateScanner()) {
+List data = this.scannerResultAggregator
+

[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability

2018-03-04 Thread jackylk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 000..fde4e55
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator> {
+
+  /**
+   * iterator which will be used to iterate over blocklets
+   */
+  private BlockletIterator blockletIterator;
+
+  /**
+   * result collector which will be used to aggregate the scanned result
+   */
+  private ScannedResultCollector scannerResultAggregator;
+
+  /**
+   * processor which will be used to process the block processing can be
+   * filter processing or non filter processing
+   */
+  private BlockletScanner blockletScanner;
+
+  /**
+   * batch size of result
+   */
+  private int batchSize;
+
+  private ExecutorService executorService;
+
+  private Future future;
+
+  private Future futureIo;
+
+  private BlockletScannedResult scannedResult;
+
+  private BlockExecutionInfo blockExecutionInfo;
+
+  private FileReader fileReader;
+
+  private AtomicBoolean nextBlock;
+
+  private AtomicBoolean nextRead;
+
+  public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader 
fileReader,
+  int batchSize, QueryStatisticsModel queryStatisticsModel, 
ExecutorService executorService) {
+this.blockExecutionInfo = blockExecutionInfo;
+this.fileReader = fileReader;
+blockletIterator = new 
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+blockExecutionInfo.getNumberOfBlockToScan());
+if (blockExecutionInfo.getFilterExecuterTree() != null) {
+  blockletScanner = new BlockletFilterScanner(blockExecutionInfo, 
queryStatisticsModel);
+} else {
+  blockletScanner = new BlockletFullScanner(blockExecutionInfo, 
queryStatisticsModel);
+}
+this.scannerResultAggregator =
+ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+this.batchSize = batchSize;
+this.executorService = executorService;
+this.nextBlock = new AtomicBoolean(false);
+this.nextRead = new AtomicBoolean(false);
+  }
+
+  @Override
+  public List next() {
+List collectedResult = null;
+if (updateScanner()) {
+  collectedResult = 
this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize);
+  while (collectedResult.size() < batchSize && updateScanner()) {
+List data = this.scannerResultAggregator
+

[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability

2018-03-04 Thread jackylk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 447ab46..547ecaa 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -35,12 +35,11 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
-import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -73,7 +72,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends 
RowLevelFilterExecut
   comparator = 
Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
 }
 ifDefaultValueMatchesFilter();
-if (isDimensionPresentInCurrentBlock[0] == true) {
+if (isDimensionPresentInCurrentBlock[0]) {
   isNaturalSorted = 
dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
   && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
 }
@@ -120,11 +119,11 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
 boolean isScanRequired = false;
 if (isMeasurePresentInCurrentBlock[0] || 
isDimensionPresentInCurrentBlock[0]) {
   if (isMeasurePresentInCurrentBlock[0]) {
-minValue = blockMinValue[measureBlocksIndex[0] + 
lastDimensionColOrdinal];
+minValue = blockMinValue[measureChunkIndex[0] + 
lastDimensionColOrdinal];
 isScanRequired =
 isScanRequired(minValue, msrFilterRangeValues, 
msrColEvalutorInfoList.get(0).getType());
   } else {
-minValue = blockMinValue[dimensionBlocksIndex[0]];
+minValue = blockMinValue[dimensionChunkIndex[0]];
 isScanRequired = isScanRequired(minValue, filterRangeValues);
   }
 } else {
@@ -170,67 +169,69 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
   }
 
   @Override
-  public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder, boolean 
useBitsetPipeLine)
-  throws FilterUnsupportedException, IOException {
+  public BitSetGroup applyFilter(RawBlockletColumnChunks 
rawBlockletColumnChunks,
+  boolean useBitsetPipeLine) throws IOException {
 // select all rows if dimension does not exists in the current block
 if (!isDimensionPresentInCurrentBlock[0] && 
!isMeasurePresentInCurrentBlock[0]) {
-  int numberOfRows = blockChunkHolder.getDataBlock().nodeSize();
+  int numberOfRows = rawBlockletColumnChunks.getDataBlock().numRows();
   return FilterUtil
-  
.createBitSetGroupWithDefaultValue(blockChunkHolder.getDataBlock().numberOfPages(),
+  
.createBitSetGroupWithDefaultValue(rawBlockletColumnChunks.getDataBlock().numberOfPages(),
   numberOfRows, true);
 }
 if (isDimensionPresentInCurrentBlock[0]) {
-  int blockIndex =
-  
segmentProperties.getDimensionOrdinalToBlockMapping().get(dimensionBlocksIndex[0]);
-  if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
-

[10/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor query scan process to improve readability

2018-03-02 Thread jackylk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
index 553f85e..773fbd7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
@@ -17,20 +17,15 @@
 package org.apache.carbondata.core.scan.result.iterator;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 
 public class PartitionSpliterRawResultIterator extends 
CarbonIterator {
 
-  private CarbonIterator iterator;
-  private BatchResult batch;
+  private CarbonIterator iterator;
+  private RowBatch batch;
   private int counter;
 
-  private static final LogService LOGGER =
-  
LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName());
-
-  public PartitionSpliterRawResultIterator(CarbonIterator 
iterator) {
+  public PartitionSpliterRawResultIterator(CarbonIterator iterator) {
 this.iterator = iterator;
   }
 
@@ -65,7 +60,7 @@ public class PartitionSpliterRawResultIterator extends 
CarbonIterator
* @param batch
* @return
*/
-  private boolean checkBatchEnd(BatchResult batch) {
+  private boolean checkBatchEnd(RowBatch batch) {
 return !(counter < batch.getSize());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 70d0958..1dd1595 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 
 /**
@@ -37,7 +37,7 @@ public class RawResultIterator extends 
CarbonIterator {
   /**
* Iterator of the Batch raw result.
*/
-  private CarbonIterator detailRawQueryResultIterator;
+  private CarbonIterator detailRawQueryResultIterator;
 
   /**
* Counter to maintain the row counter.
@@ -55,9 +55,9 @@ public class RawResultIterator extends 
CarbonIterator {
   /**
* batch of the result.
*/
-  private BatchResult batch;
+  private RowBatch batch;
 
-  public RawResultIterator(CarbonIterator 
detailRawQueryResultIterator,
+  public RawResultIterator(CarbonIterator 
detailRawQueryResultIterator,
   SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
 this.detailRawQueryResultIterator = detailRawQueryResultIterator;
 this.sourceSegProperties = sourceSegProperties;
@@ -155,7 +155,7 @@ public class RawResultIterator extends 
CarbonIterator {
* @param batch
* @return
*/
-  private boolean checkIfBatchIsProcessedCompletely(BatchResult batch) {
+  private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
 if (counter < batch.getSize()) {
   return false;
 } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
index cc9710e..c7cb00d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -35,10 +35,12