http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 000..fde4e55
--- /dev/null
+++
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator> {
+
+ /**
+ * iterator which will be used to iterate over blocklets
+ */
+ private BlockletIterator blockletIterator;
+
+ /**
+ * result collector which will be used to aggregate the scanned result
+ */
+ private ScannedResultCollector scannerResultAggregator;
+
+ /**
+ * processor which will be used to process the block processing can be
+ * filter processing or non filter processing
+ */
+ private BlockletScanner blockletScanner;
+
+ /**
+ * batch size of result
+ */
+ private int batchSize;
+
+ private ExecutorService executorService;
+
+ private Future future;
+
+ private Future futureIo;
+
+ private BlockletScannedResult scannedResult;
+
+ private BlockExecutionInfo blockExecutionInfo;
+
+ private FileReader fileReader;
+
+ private AtomicBoolean nextBlock;
+
+ private AtomicBoolean nextRead;
+
+ public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader
fileReader,
+ int batchSize, QueryStatisticsModel queryStatisticsModel,
ExecutorService executorService) {
+this.blockExecutionInfo = blockExecutionInfo;
+this.fileReader = fileReader;
+blockletIterator = new
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+blockExecutionInfo.getNumberOfBlockToScan());
+if (blockExecutionInfo.getFilterExecuterTree() != null) {
+ blockletScanner = new BlockletFilterScanner(blockExecutionInfo,
queryStatisticsModel);
+} else {
+ blockletScanner = new BlockletFullScanner(blockExecutionInfo,
queryStatisticsModel);
+}
+this.scannerResultAggregator =
+ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+this.batchSize = batchSize;
+this.executorService = executorService;
+this.nextBlock = new AtomicBoolean(false);
+this.nextRead = new AtomicBoolean(false);
+ }
+
+ @Override
+ public List
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 000..fde4e55
--- /dev/null
+++
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator> {
+
+ /**
+ * iterator which will be used to iterate over blocklets
+ */
+ private BlockletIterator blockletIterator;
+
+ /**
+ * result collector which will be used to aggregate the scanned result
+ */
+ private ScannedResultCollector scannerResultAggregator;
+
+ /**
+ * processor which will be used to process the block processing can be
+ * filter processing or non filter processing
+ */
+ private BlockletScanner blockletScanner;
+
+ /**
+ * batch size of result
+ */
+ private int batchSize;
+
+ private ExecutorService executorService;
+
+ private Future future;
+
+ private Future futureIo;
+
+ private BlockletScannedResult scannedResult;
+
+ private BlockExecutionInfo blockExecutionInfo;
+
+ private FileReader fileReader;
+
+ private AtomicBoolean nextBlock;
+
+ private AtomicBoolean nextRead;
+
+ public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader
fileReader,
+ int batchSize, QueryStatisticsModel queryStatisticsModel,
ExecutorService executorService) {
+this.blockExecutionInfo = blockExecutionInfo;
+this.fileReader = fileReader;
+blockletIterator = new
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+blockExecutionInfo.getNumberOfBlockToScan());
+if (blockExecutionInfo.getFilterExecuterTree() != null) {
+ blockletScanner = new BlockletFilterScanner(blockExecutionInfo,
queryStatisticsModel);
+} else {
+ blockletScanner = new BlockletFullScanner(blockExecutionInfo,
queryStatisticsModel);
+}
+this.scannerResultAggregator =
+ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+this.batchSize = batchSize;
+this.executorService = executorService;
+this.nextBlock = new AtomicBoolean(false);
+this.nextRead = new AtomicBoolean(false);
+ }
+
+ @Override
+ public List next() {
+List collectedResult = null;
+if (updateScanner()) {
+ collectedResult =
this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize);
+ while (collectedResult.size() < batchSize && updateScanner()) {
+List data = this.scannerResultAggregator
+