DRILL-4735: ConvertCountToDirectScan rule enhancements

1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT 
aggregates.
To achieve this DynamicPojoRecordReader was added which accepts any number of 
columns,
on the contrary with PojoRecordReader which depends on class fields.
AbstractPojoRecordReader class was added to factor out common logic for these 
two readers.

2. ConvertCountToDirectScan will distinguish between missing, directory and 
implicit columns.
For missing columns count will be set 0, for implicit to the total records count
since implicit columns are based on files and there is no data without a file.
If directory column will be encountered, rule won't be applied.
CountsCollector class was introduced to encapsulate counts collection logic.

3. MetadataDirectGroupScan class was introduced to indicate to the user when 
metadata was used
during calculation and for which files it was applied.

DRILL-4735: Changes after code review.

close #900


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8b564235
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8b564235
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8b564235

Branch: refs/heads/master
Commit: 8b5642353505d1001d7ec3590a07ad1144ecf7f3
Parents: 5c57b50
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Authored: Thu Jul 20 19:26:44 2017 +0300
Committer: Jinfeng Ni <j...@apache.org>
Committed: Mon Aug 14 22:19:24 2017 -0700

----------------------------------------------------------------------
 .../impl/project/ProjectRecordBatch.java        |   4 +-
 .../physical/ConvertCountToDirectScan.java      | 242 ++++++++++-----
 .../drill/exec/planner/sql/DirectPlan.java      |  16 +-
 .../planner/sql/handlers/ShowFileHandler.java   |   2 +-
 .../apache/drill/exec/store/ColumnExplorer.java | 261 ++++++++++++++++
 .../exec/store/ImplicitColumnExplorer.java      | 219 --------------
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   7 +-
 .../exec/store/direct/DirectGroupScan.java      |   7 +-
 .../store/direct/MetadataDirectGroupScan.java   |  86 ++++++
 .../ischema/InfoSchemaRecordGenerator.java      |  12 +-
 .../exec/store/parquet/ParquetGroupScan.java    |   4 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   6 +-
 .../store/pojo/AbstractPojoRecordReader.java    | 157 ++++++++++
 .../exec/store/pojo/AbstractPojoWriter.java     |  64 ++++
 .../drill/exec/store/pojo/AbstractWriter.java   |  62 ----
 .../store/pojo/DynamicPojoRecordReader.java     |  71 +++++
 .../drill/exec/store/pojo/PojoRecordReader.java | 187 +++---------
 .../drill/exec/store/pojo/PojoWriter.java       |  38 ++-
 .../drill/exec/store/pojo/PojoWriters.java      | 296 +++++++++++++++++++
 .../apache/drill/exec/store/pojo/Writers.java   | 274 -----------------
 .../exec/store/sys/SystemTableBatchCreator.java |   6 +-
 .../drill/TestFunctionsWithTypeExpoQueries.java |   6 +-
 .../logical/TestConvertCountToDirectScan.java   |  82 ++++-
 23 files changed, 1287 insertions(+), 822 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 676849a..6baf070 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -58,7 +58,7 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.store.ImplicitColumnExplorer;
+import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -500,7 +500,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
   }
 
   private boolean isImplicitFileColumn(ValueVector vvIn) {
-    return 
ImplicitColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName())
 != null;
+    return 
ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName())
 != null;
   }
 
   private List<NamedExpression> getExpressionList() {

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
index 879d0f7..961816e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,15 +18,21 @@
 
 package org.apache.drill.exec.planner.physical;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelRecordType;
@@ -35,37 +41,41 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.store.direct.DirectGroupScan;
-import org.apache.drill.exec.store.pojo.PojoRecordReader;
+import org.apache.drill.exec.store.ColumnExplorer;
 
-import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.direct.MetadataDirectGroupScan;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
 
 /**
- * This rule will convert
- *   " select count(*)  as mycount from table "
- * or " select count( not-nullable-expr) as mycount from table "
- *   into
- *
+ * <p>
+ * This rule will convert <b>" select count(*)  as mycount from table "</b>
+ * or <b>" select count(not-nullable-expr) as mycount from table "</b> into
+ * <pre>
  *    Project(mycount)
  *         \
  *    DirectGroupScan ( PojoRecordReader ( rowCount ))
- *
- * or
- *    " select count(column) as mycount from table "
- *    into
+ *</pre>
+ * or <b>" select count(column) as mycount from table "</b> into
+ * <pre>
  *      Project(mycount)
  *           \
  *            DirectGroupScan (PojoRecordReader (columnValueCount))
+ *</pre>
+ * Rule can be applied if query contains multiple count expressions.
+ * <b>" select count(column1), count(column2), count(*) from table "</b>
+ * </p>
  *
+ * <p>
  * Currently, only parquet group scan has the exact row count and column value 
count,
  * obtained from parquet row group info. This will save the cost to
  * scan the whole parquet files.
+ * </p>
  */
-
 public class ConvertCountToDirectScan extends Prule {
 
   public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new 
ConvertCountToDirectScan(
@@ -77,6 +87,8 @@ public class ConvertCountToDirectScan extends Prule {
       RelOptHelper.some(DrillAggregateRel.class,
                             RelOptHelper.any(DrillScanRel.class)), 
"Agg_on_scan");
 
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScan.class);
+
   /** Creates a SplunkPushDownRule. */
   protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) {
     super(rule, "ConvertCountToDirectScan:" + id);
@@ -85,40 +97,85 @@ public class ConvertCountToDirectScan extends Prule {
   @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);
-    final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
-    final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) 
call.rel(1) : null;
+    final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1);
+    final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) 
call.rel(1) : null;
 
     final GroupScan oldGrpScan = scan.getGroupScan();
     final PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
 
-    // Only apply the rule when :
+    // Only apply the rule when:
     //    1) scan knows the exact row count in getSize() call,
     //    2) No GroupBY key,
-    //    3) only one agg function (Check if it's count(*) below).
-    //    4) No distinct agg call.
+    //    3) No distinct agg call.
     if 
(!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
         && agg.getGroupCount() == 0
-        && agg.getAggCallList().size() == 1
         && !agg.containsDistinctCall())) {
       return;
     }
 
-    AggregateCall aggCall = agg.getAggCallList().get(0);
+    Map<String, Long> result = collectCounts(settings, agg, scan, project);
+    logger.trace("Calculated the following aggregate counts: ", result);
+    // if could not determine the counts, rule won't be applied
+    if (result.isEmpty()) {
+      return;
+    }
+
+    final RelDataType scanRowType = constructDataType(agg, result.keySet());
+
+    final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
+        buildSchema(scanRowType.getFieldNames()),
+        Collections.singletonList((List<Long>) new 
ArrayList<>(result.values())));
 
-    if (aggCall.getAggregation().getName().equals("COUNT") ) {
+    final ScanStats scanStats = new 
ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, 
scanRowType.getFieldCount());
+    final GroupScan directScan = new MetadataDirectGroupScan(reader, 
oldGrpScan.getFiles(), scanStats);
+
+    final ScanPrel newScan = ScanPrel.create(scan,
+        
scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON),
 directScan,
+        scanRowType);
+
+    final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), 
agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
+        .plus(DrillDistributionTrait.SINGLETON), newScan, 
prepareFieldExpressions(scanRowType), agg.getRowType());
+
+    call.transformTo(newProject);
+  }
+
+  /**
+   * Collects counts for each aggregation call.
+   * Will return empty result map if was not able to determine count for at 
least one aggregation call,
+   *
+   * For each aggregate call will determine if count can be calculated. 
Collects counts only for COUNT function.
+   * For star, not null expressions and implicit columns sets count to total 
record number.
+   * For other cases obtains counts from group scan operator. Also count can 
not be calculated for parition columns.
+   *
+   * @param agg aggregate relational expression
+   * @param scan scan relational expression
+   * @param project project relational expression
+   * @return result map where key is count column name, value is count value
+   */
+  private Map<String, Long> collectCounts(PlannerSettings settings, 
DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) {
+    final Set<String> implicitColumnsNames = 
ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet();
+    final GroupScan oldGrpScan = scan.getGroupScan();
+    final long totalRecordCount = 
oldGrpScan.getScanStats(settings).getRecordCount();
+    final LinkedHashMap<String, Long> result = new LinkedHashMap<>();
+
+    for (int i = 0; i < agg.getAggCallList().size(); i++) {
+      AggregateCall aggCall = agg.getAggCallList().get(i);
+    //for (AggregateCall aggCall : agg.getAggCallList()) {
+      long cnt;
+
+      // rule can be applied only for count function, return empty counts
+      if (!"count".equalsIgnoreCase(aggCall.getAggregation().getName()) ) {
+        return ImmutableMap.of();
+      }
+
+      if (containsStarOrNotNullInput(aggCall, agg)) {
+        cnt = totalRecordCount;
 
-      long cnt = 0;
-      //  count(*)  == >  empty arg  ==>  rowCount
-      //  count(Not-null-input) ==> rowCount
-      if (aggCall.getArgList().isEmpty() ||
-          (aggCall.getArgList().size() == 1 &&
-           ! 
agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable()))
 {
-        cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount();
       } else if (aggCall.getArgList().size() == 1) {
-      // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
+        // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
         int index = aggCall.getArgList().get(0);
 
-        if (proj != null) {
+        if (project != null) {
           // project in the middle of Agg and Scan : Only when input of 
AggCall is a RexInputRef in Project, we find the index of Scan's field.
           // For instance,
           // Agg - count($0)
@@ -127,67 +184,108 @@ public class ConvertCountToDirectScan extends Prule {
           //    \
           //   Scan (col1, col2).
           // return count of "col2" in Scan's metadata, if found.
-
-          if (proj.getProjects().get(index) instanceof RexInputRef) {
-            index = ((RexInputRef) proj.getProjects().get(index)).getIndex();
-          } else {
-            return;  // do not apply for all other cases.
+          if (!(project.getProjects().get(index) instanceof RexInputRef)) {
+            return ImmutableMap.of(); // do not apply for all other cases.
           }
+
+          index = ((RexInputRef) project.getProjects().get(index)).getIndex();
         }
 
         String columnName = 
scan.getRowType().getFieldNames().get(index).toLowerCase();
 
-        cnt = 
oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
-        if (cnt == GroupScan.NO_COLUMN_STATS) {
-          // if column stats are not available don't apply this rule
-          return;
+        // for implicit column count will the same as total record count
+        if (implicitColumnsNames.contains(columnName)) {
+          cnt = totalRecordCount;
+        } else {
+          SchemaPath simplePath = SchemaPath.getSimplePath(columnName);
+
+          if (ColumnExplorer.isPartitionColumn(settings.getOptions(), 
simplePath)) {
+            return ImmutableMap.of();
+          }
+
+          cnt = oldGrpScan.getColumnValueCount(simplePath);
+          if (cnt == GroupScan.NO_COLUMN_STATS) {
+            // if column stats is not available don't apply this rule, return 
empty counts
+            return ImmutableMap.of();
+          }
         }
       } else {
-        return; // do nothing.
+        return ImmutableMap.of();
       }
 
-      RelDataType scanRowType = 
getCountDirectScanRowType(agg.getCluster().getTypeFactory());
-
-      final ScanPrel newScan = ScanPrel.create(scan,
-          
scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON),
 getCountDirectScan(cnt),
-          scanRowType);
-
-      List<RexNode> exprs = Lists.newArrayList();
-      exprs.add(RexInputRef.of(0, scanRowType));
-
-      final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), 
agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
-          .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, 
agg.getRowType());
-
-      call.transformTo(newProj);
+      String name = "count" + i + "$" + (aggCall.getName() == null ? 
aggCall.toString() : aggCall.getName());
+      result.put(name, cnt);
     }
 
+    return ImmutableMap.copyOf(result);
   }
 
   /**
-   * Class to represent the count aggregate result.
+   * Checks if aggregate call contains star or non-null expression:
+   * <pre>
+   * count(*)  == >  empty arg  ==>  rowCount
+   * count(Not-null-input) ==> rowCount
+   * </pre>
+   *
+   * @param aggregateCall aggregate call
+   * @param aggregate aggregate relation expression
+   * @return true of aggregate call contains star or non-null expression
    */
-  public static class CountQueryResult {
-    public long count;
-
-    public CountQueryResult(long cnt) {
-      this.count = cnt;
-    }
+  private boolean containsStarOrNotNullInput(AggregateCall aggregateCall, 
DrillAggregateRel aggregate) {
+    return aggregateCall.getArgList().isEmpty() ||
+        (aggregateCall.getArgList().size() == 1 &&
+            
!aggregate.getInput().getRowType().getFieldList().get(aggregateCall.getArgList().get(0)).getType().isNullable());
   }
 
-  private RelDataType getCountDirectScanRowType(RelDataTypeFactory 
typeFactory) {
-    List<RelDataTypeField> fields = Lists.newArrayList();
-    fields.add(new RelDataTypeFieldImpl("count", 0, 
typeFactory.createSqlType(SqlTypeName.BIGINT)));
-
+  /**
+   * For each aggregate call creates field based on its name with bigint type.
+   * Constructs record type for created fields.
+   *
+   * @param aggregateRel aggregate relation expression
+   * @param fieldNames field names
+   * @return record type
+   */
+  private RelDataType constructDataType(DrillAggregateRel aggregateRel, 
Collection<String> fieldNames) {
+    List<RelDataTypeField> fields = new ArrayList<>();
+    Iterator<String> filedNamesIterator = fieldNames.iterator();
+    int fieldIndex = 0;
+    while (filedNamesIterator.hasNext()) {
+      RelDataTypeField field = new RelDataTypeFieldImpl(
+          filedNamesIterator.next(),
+          fieldIndex++,
+          
aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT));
+      fields.add(field);
+    }
     return new RelRecordType(fields);
   }
 
-  private GroupScan getCountDirectScan(long cnt) {
-    CountQueryResult res = new CountQueryResult(cnt);
-
-    PojoRecordReader<CountQueryResult> reader = new 
PojoRecordReader<CountQueryResult>(CountQueryResult.class,
-        Collections.singleton(res).iterator());
+  /**
+   * Builds schema based on given field names.
+   * Type for each schema is set to long.class.
+   *
+   * @param fieldNames field names
+   * @return schema
+   */
+  private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames) 
{
+    LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>();
+    for (String fieldName: fieldNames) {
+      schema.put(fieldName, long.class);
+    }
+    return schema;
+  }
 
-    return new DirectGroupScan(reader);
+  /**
+   * For each field creates row expression.
+   *
+   * @param rowType row type
+   * @return list of row expressions
+   */
+  private List<RexNode> prepareFieldExpressions(RelDataType rowType) {
+    List<RexNode> expressions = new ArrayList<>();
+    for (int i = 0; i < rowType.getFieldCount(); i++) {
+      expressions.add(RexInputRef.of(i, rowType));
+    }
+    return expressions;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java
index d40e0d7..3e1d6c7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,9 +17,6 @@
  */
 package org.apache.drill.exec.planner.sql;
 
-import java.util.Collections;
-import java.util.Iterator;
-
 import org.apache.drill.common.logical.PlanProperties;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder;
@@ -33,6 +30,9 @@ import 
org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.direct.DirectGroupScan;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
+import java.util.Collections;
+import java.util.List;
+
 public class DirectPlan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DirectPlan.class);
 
@@ -43,12 +43,12 @@ public class DirectPlan {
 
   @SuppressWarnings("unchecked")
   public static <T> PhysicalPlan createDirectPlan(QueryContext context, T obj){
-    Iterator<T> iter = (Iterator<T>) Collections.singleton(obj).iterator();
-    return createDirectPlan(context.getCurrentEndpoint(), iter, (Class<T>) 
obj.getClass());
+    return createDirectPlan(context.getCurrentEndpoint(), 
Collections.singletonList(obj), (Class<T>) obj.getClass());
 
   }
-  public static <T> PhysicalPlan createDirectPlan(DrillbitEndpoint endpoint, 
Iterator<T> iterator, Class<T> clazz){
-    PojoRecordReader<T> reader = new PojoRecordReader<T>(clazz, iterator);
+
+  public static <T> PhysicalPlan createDirectPlan(DrillbitEndpoint endpoint, 
List<T> records, Class<T> clazz){
+    PojoRecordReader<T> reader = new PojoRecordReader<>(clazz, records);
     DirectGroupScan scan = new DirectGroupScan(reader);
     Screen screen = new Screen(scan, endpoint);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index 5e6af7c..307b01d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -102,6 +102,6 @@ public class ShowFileHandler extends DefaultSqlHandler {
                                                                  
fileStatus.getAccessTime(), fileStatus.getModificationTime());
       rows.add(result);
     }
-    return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), 
rows.iterator(), ShowFilesCommandResult.class);
+    return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), rows, 
ShowFilesCommandResult.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
new file mode 100644
index 0000000..ccd622b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ColumnExplorer {
+
+  private final String partitionDesignator;
+  private final List<SchemaPath> columns;
+  private final boolean isStarQuery;
+  private final List<Integer> selectedPartitionColumns;
+  private final List<SchemaPath> tableColumns;
+  private final Map<String, ImplicitFileColumns> allImplicitColumns;
+  private final Map<String, ImplicitFileColumns> selectedImplicitColumns;
+
+
+  /**
+   * Helper class that encapsulates logic for sorting out columns
+   * between actual table columns, partition columns and implicit file columns.
+   * Also populates map with implicit columns names as keys and their values
+   */
+  public ColumnExplorer(FragmentContext context, List<SchemaPath> columns) {
+    this(context.getOptions(), columns);
+  }
+
+  /**
+   * Helper class that encapsulates logic for sorting out columns
+   * between actual table columns, partition columns and implicit file columns.
+   * Also populates map with implicit columns names as keys and their values
+   */
+  public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) 
{
+    this.partitionDesignator = 
optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+    this.columns = columns;
+    this.isStarQuery = columns != null && 
AbstractRecordReader.isStarQuery(columns);
+    this.selectedPartitionColumns = Lists.newArrayList();
+    this.tableColumns = Lists.newArrayList();
+    this.allImplicitColumns = initImplicitFileColumns(optionManager);
+    this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap();
+
+    init();
+  }
+
+  /**
+   * Creates case insensitive map with implicit file columns as keys and 
appropriate ImplicitFileColumns enum as values
+   */
+  public static Map<String, ImplicitFileColumns> 
initImplicitFileColumns(OptionManager optionManager) {
+    Map<String, ImplicitFileColumns> map = CaseInsensitiveMap.newHashMap();
+    for (ImplicitFileColumns e : ImplicitFileColumns.values()) {
+      OptionValue optionValue;
+      if ((optionValue = optionManager.getOption(e.name)) != null) {
+        map.put(optionValue.string_val, e);
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Checks if given column is partition or not.
+   *
+   * @param optionManager options
+   * @param column column
+   * @return true if given column is partition, false otherwise
+   */
+  public static boolean isPartitionColumn(OptionManager optionManager, 
SchemaPath column){
+    String partitionDesignator = 
optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+    String path = column.getAsUnescapedPath();
+    return isPartitionColumn(partitionDesignator, path);
+  }
+
+  /**
+   * Checks if given column is partition or not.
+   *
+   * @param partitionDesignator partition designator
+   * @param path column path
+   * @return true if given column is partition, false otherwise
+   */
+  public static boolean isPartitionColumn(String partitionDesignator, String 
path){
+    Pattern pattern = Pattern.compile(String.format("%s[0-9]+", 
partitionDesignator));
+    Matcher matcher = pattern.matcher(path);
+    return matcher.matches();
+  }
+
+  /**
+   * Compares selection root and actual file path to determine partition 
columns values.
+   * Adds implicit file columns according to columns list.
+   *
+   * @return map with columns names as keys and their values
+   */
+  public Map<String, String> populateImplicitColumns(FileWork work, String 
selectionRoot) {
+    return populateImplicitColumns(work.getPath(), selectionRoot);
+  }
+
+  /**
+   * Compares selection root and actual file path to determine partition 
columns values.
+   * Adds implicit file columns according to columns list.
+   *
+   * @return map with columns names as keys and their values
+   */
+  public Map<String, String> populateImplicitColumns(String filePath, String 
selectionRoot) {
+    Map<String, String> implicitValues = Maps.newLinkedHashMap();
+    if (selectionRoot != null) {
+      String[] r = Path.getPathWithoutSchemeAndAuthority(new 
Path(selectionRoot)).toString().split("/");
+      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+      String[] p = path.toString().split("/");
+      if (p.length > r.length) {
+        String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
+        for (int a = 0; a < q.length; a++) {
+          if (isStarQuery || selectedPartitionColumns.contains(a)) {
+            implicitValues.put(partitionDesignator + a, q[a]);
+          }
+        }
+      }
+      //add implicit file columns
+      for (Map.Entry<String, ImplicitFileColumns> entry : 
selectedImplicitColumns.entrySet()) {
+        implicitValues.put(entry.getKey(), entry.getValue().getValue(path));
+      }
+    }
+    return implicitValues;
+  }
+
+  public boolean isStarQuery() {
+    return isStarQuery;
+  }
+
+  public List<SchemaPath> getTableColumns() {
+    return tableColumns;
+  }
+
+  /**
+   * Checks if current column selection contains partition columns.
+   *
+   * @return true if partition columns are present, false otherwise
+   */
+  public boolean containsPartitionColumns() {
+    return !selectedPartitionColumns.isEmpty();
+  }
+
+  /**
+   * Checks if current column selection contains implicit columns.
+   *
+   * @return true if implicit columns are present, false otherwise
+   */
+  public boolean containsImplicitColumns() {
+    return !selectedImplicitColumns.isEmpty();
+  }
+
+  /**
+   * If it is not star query, sorts out columns into three categories:
+   * 1. table columns
+   * 2. partition columns
+   * 3. implicit file columns
+   */
+  private void init() {
+    if (isStarQuery) {
+      selectedImplicitColumns.putAll(allImplicitColumns);
+    } else {
+      for (SchemaPath column : columns) {
+        String path = column.getAsUnescapedPath();
+        if (isPartitionColumn(partitionDesignator, path)) {
+          
selectedPartitionColumns.add(Integer.parseInt(path.substring(partitionDesignator.length())));
+        } else if (allImplicitColumns.get(path) != null) {
+          selectedImplicitColumns.put(path, allImplicitColumns.get(path));
+        } else {
+          tableColumns.add(column);
+        }
+      }
+    }
+  }
+
+  /**
+   * Columns that give information from where file data comes from.
+   * Columns are implicit, so should be called explicitly in query
+   */
+  public enum ImplicitFileColumns {
+
+    /**
+     * Fully qualified name, contains full path to file and file name
+     */
+    FQN (ExecConstants.IMPLICIT_FQN_COLUMN_LABEL) {
+      @Override
+      public String getValue(Path path) {
+        return path.toUri().getPath();
+      }
+    },
+
+    /**
+     * Full path to file without file name
+     */
+    FILEPATH (ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL) {
+      @Override
+      public String getValue(Path path) {
+        return path.getParent().toUri().getPath();
+      }
+    },
+
+    /**
+     * File name with extension without path
+     */
+    FILENAME (ExecConstants.IMPLICIT_FILENAME_COLUMN_LABEL) {
+      @Override
+      public String getValue(Path path) {
+        return path.getName();
+      }
+    },
+
+    /**
+     * File suffix (without dot at the beginning)
+     */
+    SUFFIX (ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL) {
+      @Override
+      public String getValue(Path path) {
+        return Files.getFileExtension(path.getName());
+      }
+    };
+
+    String name;
+
+    ImplicitFileColumns(String name) {
+      this.name = name;
+    }
+
+    /**
+     * Using file path calculates value for each implicit file column
+     */
+    public abstract String getValue(Path path);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java
deleted file mode 100644
index 42ff827..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.hadoop.fs.Path;
-
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class ImplicitColumnExplorer {
-
-  private final String partitionDesignator;
-  private final List<SchemaPath> columns;
-  private final boolean isStarQuery;
-  private final List<Integer> selectedPartitionColumns;
-  private final List<SchemaPath> tableColumns;
-  private final Map<String, ImplicitFileColumns> allImplicitColumns;
-  private final Map<String, ImplicitFileColumns> selectedImplicitColumns;
-
-
-  /**
-   * Helper class that encapsulates logic for sorting out columns
-   * between actual table columns, partition columns and implicit file columns.
-   * Also populates map with implicit columns names as keys and their values
-   */
-  public ImplicitColumnExplorer(FragmentContext context, List<SchemaPath> 
columns) {
-    this(context.getOptions(), columns);
-  }
-
-  /**
-   * Helper class that encapsulates logic for sorting out columns
-   * between actual table columns, partition columns and implicit file columns.
-   * Also populates map with implicit columns names as keys and their values
-   */
-  public ImplicitColumnExplorer(OptionManager optionManager, List<SchemaPath> 
columns) {
-    this.partitionDesignator = 
optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
-    this.columns = columns;
-    this.isStarQuery = columns != null && 
AbstractRecordReader.isStarQuery(columns);
-    this.selectedPartitionColumns = Lists.newArrayList();
-    this.tableColumns = Lists.newArrayList();
-    this.allImplicitColumns = initImplicitFileColumns(optionManager);
-    this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap();
-
-    init();
-  }
-
-  /**
-   * Creates case insensitive map with implicit file columns as keys and 
appropriate ImplicitFileColumns enum as values
-   */
-  public static Map<String, ImplicitFileColumns> 
initImplicitFileColumns(OptionManager optionManager) {
-    Map<String, ImplicitFileColumns> map = CaseInsensitiveMap.newHashMap();
-    for (ImplicitFileColumns e : ImplicitFileColumns.values()) {
-      OptionValue optionValue;
-      if ((optionValue = optionManager.getOption(e.name)) != null) {
-        map.put(optionValue.string_val, e);
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
-   *
-   * @return map with columns names as keys and their values
-   */
-  public Map<String, String> populateImplicitColumns(FileWork work, String 
selectionRoot) {
-    return populateImplicitColumns(work.getPath(), selectionRoot);
-  }
-
-  /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
-   *
-   * @return map with columns names as keys and their values
-   */
-  public Map<String, String> populateImplicitColumns(String filePath, String 
selectionRoot) {
-    Map<String, String> implicitValues = Maps.newLinkedHashMap();
-    if (selectionRoot != null) {
-      String[] r = Path.getPathWithoutSchemeAndAuthority(new 
Path(selectionRoot)).toString().split("/");
-      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
-      String[] p = path.toString().split("/");
-      if (p.length > r.length) {
-        String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
-        for (int a = 0; a < q.length; a++) {
-          if (isStarQuery || selectedPartitionColumns.contains(a)) {
-            implicitValues.put(partitionDesignator + a, q[a]);
-          }
-        }
-      }
-      //add implicit file columns
-      for (Map.Entry<String, ImplicitFileColumns> entry : 
selectedImplicitColumns.entrySet()) {
-        implicitValues.put(entry.getKey(), entry.getValue().getValue(path));
-      }
-    }
-    return implicitValues;
-  }
-
-  public boolean isStarQuery() {
-    return isStarQuery;
-  }
-
-  public List<SchemaPath> getTableColumns() {
-    return tableColumns;
-  }
-
-  /**
-   * If it is not star query, sorts out columns into three categories:
-   * 1. table columns
-   * 2. partition columns
-   * 3. implicit file columns
-   */
-  private void init() {
-    if (isStarQuery) {
-      selectedImplicitColumns.putAll(allImplicitColumns);
-    } else {
-      Pattern pattern = Pattern.compile(String.format("%s[0-9]+", 
partitionDesignator));
-      for (SchemaPath column : columns) {
-        String path = column.getAsUnescapedPath();
-        Matcher m = pattern.matcher(path);
-        if (m.matches()) {
-          
selectedPartitionColumns.add(Integer.parseInt(path.substring(partitionDesignator.length())));
-        } else if (allImplicitColumns.get(path) != null) {
-          selectedImplicitColumns.put(path, allImplicitColumns.get(path));
-        } else {
-          tableColumns.add(column);
-        }
-      }
-    }
-  }
-
-  /**
-   * Columns that give information from where file data comes from.
-   * Columns are implicit, so should be called explicitly in query
-   */
-  public enum ImplicitFileColumns {
-
-    /**
-     * Fully qualified name, contains full path to file and file name
-     */
-    FQN (ExecConstants.IMPLICIT_FQN_COLUMN_LABEL) {
-      @Override
-      public String getValue(Path path) {
-        return path.toString();
-      }
-    },
-
-    /**
-     * Full path to file without file name
-     */
-    FILEPATH (ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL) {
-      @Override
-      public String getValue(Path path) {
-        return path.getParent().toString();
-      }
-    },
-
-    /**
-     * File name with extension without path
-     */
-    FILENAME (ExecConstants.IMPLICIT_FILENAME_COLUMN_LABEL) {
-      @Override
-      public String getValue(Path path) {
-        return path.getName();
-      }
-    },
-
-    /**
-     * File suffix (without dot at the beginning)
-     */
-    SUFFIX (ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL) {
-      @Override
-      public String getValue(Path path) {
-        return Files.getFileExtension(path.getName());
-      }
-    };
-
-    String name;
-
-    ImplicitFileColumns(String name) {
-      this.name = name;
-    }
-
-    /**
-     * Using file path calculates value for each implicit file column
-     */
-    public abstract String getValue(Path path);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 776d806..1f7bce9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,7 +41,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.ImplicitColumnExplorer;
+import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -52,7 +52,6 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -128,7 +127,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
 
   @SuppressWarnings("resource")
   CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan 
scan) throws ExecutionSetupException {
-    final ImplicitColumnExplorer columnExplorer = new 
ImplicitColumnExplorer(context, scan.getColumns());
+    final ColumnExplorer columnExplorer = new ColumnExplorer(context, 
scan.getColumns());
 
     if (!columnExplorer.isStarQuery()) {
       scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), 
scan.getFormatPlugin(),

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index a4b2fad..67b2e5c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,10 +33,9 @@ import java.util.List;
 
 @JsonTypeName("direct-scan")
 public class DirectGroupScan extends AbstractGroupScan {
-//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DirectGroupScan.class);
 
-  private final RecordReader reader;
-  private final ScanStats stats;
+  protected final RecordReader reader;
+  protected final ScanStats stats;
 
   public DirectGroupScan(RecordReader reader) {
     this(reader, ScanStats.TRIVIAL_TABLE);

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
new file mode 100644
index 0000000..505d68e
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.direct;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Represents direct scan based on metadata information.
+ * For example, for parquet files it can be obtained from parquet footer 
(total row count)
+ * or from parquet metadata files (column counts).
+ * Contains reader, statistics and list of scanned files if present.
+ */
+@JsonTypeName("metadata-direct-scan")
+public class MetadataDirectGroupScan extends DirectGroupScan {
+
+  private final Collection<String> files;
+
+  public MetadataDirectGroupScan(RecordReader reader, Collection<String> 
files) {
+    super(reader);
+    this.files = files;
+  }
+
+  public MetadataDirectGroupScan(RecordReader reader, Collection<String> 
files, ScanStats stats) {
+    super(reader, stats);
+    this.files = files;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
+    assert children == null || children.isEmpty();
+    return new MetadataDirectGroupScan(reader, files, stats);
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return this;
+  }
+
+  /**
+   * <p>
+   * Returns string representation of group scan data.
+   * Includes list of files if present.
+   * </p>
+   *
+   * <p>
+   * Example: [files = [/tmp/0_0_0.parquet], numFiles = 1]
+   * </p>
+   *
+   * @return string representation of group scan data
+   */
+  @Override
+  public String getDigest() {
+    if (files != null) {
+      StringBuilder builder = new StringBuilder();
+      builder.append("files = ").append(files).append(", ");
+      builder.append("numFiles = ").append(files.size()).append(", ");
+      return builder.append(super.getDigest()).toString();
+    }
+    return super.getDigest();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index aee3dc1..e96ec68 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -250,7 +250,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
 
     @Override
     public PojoRecordReader<Records.Catalog> getRecordReader() {
-      return new PojoRecordReader<>(Records.Catalog.class, records.iterator());
+      return new PojoRecordReader<>(Records.Catalog.class, records);
     }
 
     @Override
@@ -269,7 +269,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
 
     @Override
     public PojoRecordReader<Records.Schema> getRecordReader() {
-      return new PojoRecordReader<>(Records.Schema.class, records.iterator());
+      return new PojoRecordReader<>(Records.Schema.class, records);
     }
 
     @Override
@@ -290,7 +290,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
 
     @Override
     public PojoRecordReader<Records.Table> getRecordReader() {
-      return new PojoRecordReader<>(Records.Table.class, records.iterator());
+      return new PojoRecordReader<>(Records.Table.class, records);
     }
 
     @Override
@@ -341,7 +341,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
 
     @Override
     public PojoRecordReader<Records.View> getRecordReader() {
-      return new PojoRecordReader<>(Records.View.class, records.iterator());
+      return new PojoRecordReader<>(Records.View.class, records);
     }
 
     @Override
@@ -362,7 +362,7 @@ public abstract class InfoSchemaRecordGenerator<S> {
 
     @Override
     public PojoRecordReader<Records.Column> getRecordReader() {
-      return new PojoRecordReader<>(Records.Column.class, records.iterator());
+      return new PojoRecordReader<>(Records.Column.class, records);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 30f607d..c333a3e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -54,7 +54,7 @@ import 
org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ImplicitColumnExplorer;
+import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -1063,7 +1063,7 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     ParquetFilterPredicate filterPredicate = null;
 
     for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
-      final ImplicitColumnExplorer columnExplorer = new 
ImplicitColumnExplorer(optionManager, this.columns);
+      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, 
this.columns);
       Map<String, String> implicitColValues = 
columnExplorer.populateImplicitColumns(file.getPath(), selectionRoot);
 
       for (RowGroupMetadata rowGroup : file.getRowGroups()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 5e22458..490a5a0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,7 +32,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.ImplicitColumnExplorer;
+import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
@@ -63,7 +63,7 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
     Preconditions.checkArgument(children.isEmpty());
     OperatorContext oContext = context.newOperatorContext(rowGroupScan);
 
-    final ImplicitColumnExplorer columnExplorer = new 
ImplicitColumnExplorer(context, rowGroupScan.getColumns());
+    final ColumnExplorer columnExplorer = new ColumnExplorer(context, 
rowGroupScan.getColumns());
 
     if (!columnExplorer.isStarQuery()) {
       rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), 
rowGroupScan.getStorageEngine(),

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
new file mode 100644
index 0000000..0c1144a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parent class for all pojo readers. Pojo readers can be based on java class 
(field list is predefined) or dynamic.
+ * Contains general logic for initiating writers and reading values from each 
row fields.
+ */
+public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader 
implements Iterable<T> {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractPojoRecordReader.class);
+  private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(AbstractPojoRecordReader.class);
+
+  protected final List<T> records;
+  protected List<PojoWriter> writers;
+
+  private Iterator<T> currentIterator;
+  private OperatorContext operatorContext;
+
+  protected AbstractPojoRecordReader(List<T> records) {
+    this.records = records;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+    operatorContext = context;
+    writers = setupWriters(output);
+    currentIterator = records.iterator();
+  }
+
+  @Override
+  public int next() {
+    boolean allocated = false;
+    injector.injectPause(operatorContext.getExecutionControls(), "read-next", 
logger);
+
+    int recordCount = 0;
+    while (currentIterator.hasNext()) {
+      if (!allocated) {
+        allocate();
+        allocated = true;
+      }
+
+      T row = currentIterator.next();
+      for (int i = 0; i < writers.size(); i++) {
+        PojoWriter writer = writers.get(i);
+        writer.writeField(getFieldValue(row, i), recordCount);
+      }
+      recordCount++;
+    }
+
+    if (recordCount != 0) {
+      setValueCount(recordCount);
+    }
+    return recordCount;
+  }
+
+  @Override
+  public void allocate(Map<String, ValueVector> vectorMap) throws 
OutOfMemoryException {
+    for (final ValueVector v : vectorMap.values()) {
+      AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return records.iterator();
+  }
+
+  /**
+   * Creates writer based input class type and then initiates it.
+   *
+   * @param type class type
+   * @param fieldName field name
+   * @param output output mutator
+   * @return pojo writer
+   */
+  protected PojoWriter initWriter(Class<?> type, String fieldName, 
OutputMutator output) throws ExecutionSetupException {
+    PojoWriter writer = PojoWriters.getWriter(type, fieldName, 
output.getManagedBuffer());
+    try {
+      writer.init(output);
+      return writer;
+    } catch (SchemaChangeException e) {
+      throw new ExecutionSetupException("Failure while setting up schema for 
AbstractPojoRecordReader.", e);
+    }
+  }
+
+  /**
+   * Allocates buffers for each writer.
+   */
+  private void allocate() {
+    for (PojoWriter writer : writers) {
+      writer.allocate();
+    }
+  }
+
+  /**
+   * Sets number of written records for each writer.
+   *
+   * @param recordCount number of records written
+   */
+  private void setValueCount(int recordCount) {
+    for (PojoWriter writer : writers) {
+      writer.setValueCount(recordCount);
+    }
+  }
+
+  /**
+   * Setups writers for each field in the row.
+   *
+   * @param output output mutator
+   * @return list of pojo writers
+   */
+  protected abstract List<PojoWriter> setupWriters(OutputMutator output) 
throws ExecutionSetupException;
+
+  /**
+   * Retrieves field value to be written based for given row and field 
position.
+   *
+   * @param row current row
+   * @param fieldPosition current field position
+   * @return field value to be written for given row
+   */
+  protected abstract Object getFieldValue(T row, int fieldPosition);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java
new file mode 100644
index 0000000..a2a4644
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Parent class for all pojo writers created for each field.
+ * Contains common logic for initializing value vector, stores field name and 
its type.
+ */
+public abstract class AbstractPojoWriter<V extends ValueVector> implements 
PojoWriter {
+
+  protected V vector;
+  private final String fieldName;
+  private final MajorType type;
+
+  public AbstractPojoWriter(String fieldName, MajorType type) {
+    this.fieldName = fieldName;
+    this.type = type;
+  }
+
+  @Override
+  public void init(OutputMutator output) throws SchemaChangeException {
+    MaterializedField mf = MaterializedField.create(fieldName, type);
+    @SuppressWarnings("unchecked")
+    Class<V> valueVectorClass = (Class<V>) 
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+    this.vector = output.addField(mf, valueVectorClass);
+  }
+
+  @Override
+  public void allocate() {
+    vector.allocateNew();
+  }
+
+  @Override
+  public void setValueCount(int valueCount) {
+    vector.getMutator().setValueCount(valueCount);
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
deleted file mode 100644
index c41a07a..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pojo;
-
-import java.lang.reflect.Field;
-
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.ValueVector;
-
-abstract class AbstractWriter<V extends ValueVector> implements PojoWriter{
-
-  protected final Field field;
-  protected V vector;
-  protected final MajorType type;
-
-  public AbstractWriter(Field field, MajorType type){
-    this.field = field;
-    this.type = type;
-  }
-
-  @Override
-  public void init(OutputMutator output) throws SchemaChangeException {
-    MaterializedField mf = MaterializedField.create(field.getName(), type);
-    @SuppressWarnings("unchecked")
-    Class<V> valueVectorClass = (Class<V>) 
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
-    this.vector = output.addField(mf, valueVectorClass);
-  }
-
-  @Override
-  public void allocate() {
-    vector.allocateNew();
-  }
-
-  public void setValueCount(int valueCount){
-    vector.getMutator().setValueCount(valueCount);
-  }
-
-  @Override
-  public void cleanup() {
-  }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
new file mode 100644
index 0000000..82383f0
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pojo;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Dynamically reads values from the given list of records.
+ * Creates writers based on given schema.
+ *
+ * @param <T> type of given values, if contains various types, use Object class
+ */
+public class DynamicPojoRecordReader<T> extends 
AbstractPojoRecordReader<List<T>> {
+
+  private final LinkedHashMap<String, Class<?>> schema;
+
+  public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, 
List<List<T>> records) {
+    super(records);
+    Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined 
schema is not allowed.");
+    this.schema = schema;
+  }
+
+  /**
+   * Initiates writers based on given schema which contains field name and its 
type.
+   *
+   * @param output output mutator
+   * @return list of pojo writers
+   */
+  @Override
+  protected List<PojoWriter> setupWriters(OutputMutator output) throws 
ExecutionSetupException {
+    List<PojoWriter> writers = new ArrayList<>();
+    for (Map.Entry<String, Class<?>> field : schema.entrySet()) {
+      writers.add(initWriter(field.getValue(), field.getKey(), output));
+    }
+    return writers;
+  }
+
+  @Override
+  protected Object getFieldValue(List<T> row, int fieldPosition) {
+    return row.get(fieldPosition);
+  }
+
+  @Override
+  public String toString() {
+    return "DynamicPojoRecordReader{" +
+        "records = " + records +
+        "}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index baf07a4..c3b6883 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,173 +17,66 @@
  */
 package org.apache.drill.exec.store.pojo;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.pojo.Writers.BitWriter;
-import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
-import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
-import org.apache.drill.exec.store.pojo.Writers.IntWriter;
-import org.apache.drill.exec.store.pojo.Writers.LongWriter;
-import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter;
-import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter;
-import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
-import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
-import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
-import org.apache.drill.exec.store.pojo.Writers.StringWriter;
-import org.apache.drill.exec.testing.ControlsInjector;
-import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
 
-public class PojoRecordReader<T> extends AbstractRecordReader implements 
Iterable<T> {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
-  private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(PojoRecordReader.class);
+/**
+ * Reads values from the given list of pojo instances.
+ * Fields writers are determined based on pojo field class types.
+ *
+ * @param <T> pojo class type
+ */
+public class PojoRecordReader<T> extends AbstractPojoRecordReader<T> {
 
   private final Class<T> pojoClass;
-  private final List<T> pojoObjects;
-  private PojoWriter[] writers;
-  private boolean doCurrent;
-  private T currentPojo;
-  private OperatorContext operatorContext;
+  private final List<Field> fields;
 
-  private Iterator<T> currentIterator;
-
-  /**
-   * TODO: Cleanup the callers to pass the List of POJO objects directly 
rather than iterator.
-   * @param pojoClass
-   * @param iterator
-   */
-  public PojoRecordReader(Class<T> pojoClass, Iterator<T> iterator) {
+  public PojoRecordReader(Class<T> pojoClass, List<T> records) {
+    super(records);
     this.pojoClass = pojoClass;
-    this.pojoObjects = ImmutableList.copyOf(iterator);
+    this.fields = new ArrayList<>();
   }
 
+  /**
+   * Creates writers based on pojo field class types. Ignores static fields.
+   *
+   * @param output output mutator
+   * @return list of pojo writers
+   */
   @Override
-  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
-    operatorContext = context;
-    try {
-      Field[] fields = pojoClass.getDeclaredFields();
-      List<PojoWriter> writers = Lists.newArrayList();
-
-      for (int i = 0; i < fields.length; i++) {
-        Field f = fields[i];
-
-        if (Modifier.isStatic(f.getModifiers())) {
-          continue;
-        }
-
-        Class<?> type = f.getType();
-        PojoWriter w = null;
-        if(type == int.class) {
-          w = new IntWriter(f);
-        } else if(type == Integer.class) {
-          w = new NIntWriter(f);
-        } else if(type == Long.class) {
-          w = new NBigIntWriter(f);
-        } else if(type == Boolean.class) {
-          w = new NBooleanWriter(f);
-        } else if(type == double.class) {
-          w = new DoubleWriter(f);
-        } else if(type == Double.class) {
-          w = new NDoubleWriter(f);
-        } else if(type.isEnum()) {
-          w = new EnumWriter(f, output.getManagedBuffer());
-        } else if(type == boolean.class) {
-          w = new BitWriter(f);
-        } else if(type == long.class) {
-          w = new LongWriter(f);
-        } else if(type == String.class) {
-          w = new StringWriter(f, output.getManagedBuffer());
-        } else if (type == Timestamp.class) {
-          w = new NTimeStampWriter(f);
-        } else {
-          throw new ExecutionSetupException(String.format("PojoRecord reader 
doesn't yet support conversions from type [%s].", type));
-        }
-        writers.add(w);
-        w.init(output);
+  protected List<PojoWriter> setupWriters(OutputMutator output) throws 
ExecutionSetupException {
+    List<PojoWriter> writers = new ArrayList<>();
+    Field[] declaredFields = pojoClass.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (Modifier.isStatic(field.getModifiers())) {
+        continue;
       }
-
-      this.writers = writers.toArray(new PojoWriter[writers.size()]);
-
-    } catch(SchemaChangeException e) {
-      throw new ExecutionSetupException("Failure while setting up schema for 
PojoRecordReader.", e);
-    }
-
-    currentIterator = pojoObjects.iterator();
-  }
-
-  @Override
-  public void allocate(Map<String, ValueVector> vectorMap) throws 
OutOfMemoryException {
-    for (final ValueVector v : vectorMap.values()) {
-      AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
-    }
-  }
-
-  private void allocate() {
-    for (PojoWriter writer : writers) {
-      writer.allocate();
-    }
-  }
-
-  private void setValueCount(int i) {
-    for (PojoWriter writer : writers) {
-      writer.setValueCount(i);
+      writers.add(initWriter(field.getType(), field.getName(), output));
+      fields.add(field);
     }
+    return writers;
   }
 
   @Override
-  public int next() {
-    boolean allocated = false;
-    injector.injectPause(operatorContext.getExecutionControls(), "read-next", 
logger);
+  protected Object getFieldValue(T row, int fieldPosition) {
     try {
-      int i =0;
-      while (doCurrent || currentIterator.hasNext()) {
-        if (doCurrent) {
-          doCurrent = false;
-        } else {
-          currentPojo = currentIterator.next();
-        }
-
-        if (!allocated) {
-          allocate();
-          allocated = true;
-        }
-
-        for (PojoWriter writer : writers) {
-          writer.writeField(currentPojo, i);
-        }
-        i++;
-      }
-
-      if (i != 0 ) {
-        setValueCount(i);
-      }
-      return i;
+      return fields.get(fieldPosition).get(row);
     } catch (IllegalArgumentException | IllegalAccessException e) {
-      throw new RuntimeException("Failure while trying to use 
PojoRecordReader.", e);
+      throw new DrillRuntimeException("Failure while trying to use 
PojoRecordReader.", e);
     }
   }
 
   @Override
-  public Iterator<T> iterator() {
-    return pojoObjects.iterator();
-  }
-
-  @Override
-  public void close() {
+  public String toString() {
+    return "PojoRecordReader{" +
+        "pojoClass = " + pojoClass +
+        ", recordCount = " + records.size() +
+        "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8b564235/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
index 31748f4..335bfb1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,10 +20,40 @@ package org.apache.drill.exec.store.pojo;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 
-interface PojoWriter{
-  void writeField(Object pojo, int outboundIndex) throws 
IllegalArgumentException, IllegalAccessException ;
+/**
+ * Pojo writer interface for writers based on types supported for pojo.
+ */
+public interface PojoWriter {
+
+  /**
+   * Writes given value to the given position of the bit to set.
+   *
+   * @param value values to be written
+   * @param outboundIndex position of the bit
+   */
+  void writeField(Object value, int outboundIndex);
+
+  /**
+   * Initializes value vector.
+   *
+   * @param output output mutator
+   */
   void init(OutputMutator output) throws SchemaChangeException;
+
+  /**
+   * Allocates new buffer for value vector.
+   */
   void allocate();
-  void setValueCount(int i);
+
+  /**
+   * Sets number of written records.
+   *
+   * @param recordCount record count
+   */
+  void setValueCount(int recordCount);
+
+  /**
+   * Performs clean up if needed.
+   */
   void cleanup();
 }
\ No newline at end of file

Reply via email to