DRILL-6164: Heap memory leak during parquet scan and OOM

closes #1122


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/894c0f58
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/894c0f58
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/894c0f58

Branch: refs/heads/master
Commit: 894c0f58ecff11ce53ede696f2a82308ca8333d4
Parents: 24a7acd
Author: Vlad Rozov <vro...@apache.org>
Authored: Thu Feb 15 19:25:21 2018 -0800
Committer: Vitalii Diravka <vitalii.dira...@gmail.com>
Committed: Mon Feb 19 08:57:05 2018 +0000

----------------------------------------------------------------------
 .../exec/store/mapr/db/MapRDBScanBatchCreator.java |  4 ++--
 .../exec/store/hbase/HBaseScanBatchCreator.java    |  4 ++--
 .../hive/HiveDrillNativeScanBatchCreator.java      |  3 ++-
 .../initilializers/DefaultReadersInitializer.java  |  4 ++--
 .../exec/store/kafka/KafkaScanBatchCreator.java    |  4 ++--
 .../exec/store/kudu/KuduScanBatchCreator.java      |  4 ++--
 .../exec/store/mongo/MongoScanBatchCreator.java    |  4 ++--
 .../exec/store/openTSDB/OpenTSDBBatchCreator.java  |  4 ++--
 .../apache/drill/exec/physical/impl/ScanBatch.java |  3 +++
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |  3 ++-
 .../exec/store/mock/MockScanBatchCreator.java      |  4 ++--
 .../store/parquet/ParquetScanBatchCreator.java     |  3 ++-
 .../exec/store/parquet2/DrillParquetReader.java    | 14 +++++++++-----
 .../parquet2/DrillParquetRecordMaterializer.java   | 17 +++++++++++------
 .../exec/physical/unit/MiniPlanUnitTestBase.java   |  5 +++--
 .../exec/physical/unit/PhysicalOpUnitTestBase.java |  3 ++-
 16 files changed, 50 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index d9c8ce7..c15d9ba 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mapr.db;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -31,13 +32,12 @@ import 
org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
 import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> {
   @Override
   public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan 
subScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
       try {
         if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index ff9a4e4..87c64ac 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.hbase;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,14 +30,13 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> {
   @Override
   public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan 
subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     List<SchemaPath> columns = null;
     for(HBaseSubScan.HBaseSubScanSpec scanSpec : 
subScan.getRegionScanSpecList()){
       try {

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 243d781..3861aa0 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -95,7 +96,7 @@ public class HiveDrillNativeScanBatchCreator implements 
BatchCreator<HiveDrillNa
     final OperatorContext oContext = context.newOperatorContext(config);
 
     int currentPartitionIndex = 0;
-    final List<RecordReader> readers = Lists.newArrayList();
+    final List<RecordReader> readers = new LinkedList<>();
 
     final HiveConf conf = config.getHiveConf();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
index a161151..52f394f 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
@@ -26,7 +26,7 @@ import 
org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersIn
 import org.apache.hadoop.mapred.InputSplit;
 
 import java.lang.reflect.Constructor;
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -44,7 +44,7 @@ public class DefaultReadersInitializer extends 
AbstractReadersInitializer {
     List<HivePartition> partitions = config.getPartitions();
     boolean hasPartitions = partitions != null && !partitions.isEmpty();
 
-    List<RecordReader> readers = new ArrayList<>(inputSplits.size());
+    List<RecordReader> readers = new LinkedList<>();
     Constructor<? extends HiveAbstractReader> readerConstructor = 
createReaderConstructor();
     for (int i = 0 ; i < inputSplits.size(); i++) {
       readers.add(createReader(readerConstructor, hasPartitions ? 
partitions.get(i) : null, inputSplits.get(i)));

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 55fc28f..9bedbd5 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
   static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
@@ -43,7 +43,7 @@ public class KafkaScanBatchCreator implements 
BatchCreator<KafkaSubScan> {
     Preconditions.checkArgument(children.isEmpty());
     List<SchemaPath> columns = subScan.getColumns() != null ? 
subScan.getColumns() : GroupScan.ALL_COLUMNS;
 
-    List<RecordReader> readers = 
Lists.newArrayListWithCapacity(subScan.getPartitionSubScanSpecList().size());
+    List<RecordReader> readers = new LinkedList<>();
     for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
       readers.add(new KafkaRecordReader(scanSpec, columns, context, 
subScan.getKafkaStoragePlugin()));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
index 5e9d86c..7d09d10 100644
--- 
a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
+++ 
b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.kudu;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,7 +30,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class);
@@ -38,7 +38,7 @@ public class KuduScanBatchCreator implements 
BatchCreator<KuduSubScan>{
   public ScanBatch getBatch(ExecutorFragmentContext context, KuduSubScan 
subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     List<SchemaPath> columns = null;
 
     for (KuduSubScan.KuduSubScanSpec scanSpec : 
subScan.getTabletScanSpecList()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 42717fc..c970cfd 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mongo;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -31,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
   static final Logger logger = LoggerFactory
@@ -41,7 +41,7 @@ public class MongoScanBatchCreator implements 
BatchCreator<MongoSubScan> {
   public ScanBatch getBatch(ExecutorFragmentContext context, MongoSubScan 
subScan,
       List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     List<SchemaPath> columns = null;
     for (MongoSubScan.MongoSubScanSpec scanSpec : subScan
         .getChunkScanSpecList()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
 
b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
index fce2d7a..6bd059d 100644
--- 
a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
+++ 
b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.openTSDB;
 
-import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
@@ -28,6 +27,7 @@ import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 
+import java.util.LinkedList;
 import java.util.List;
 
 public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> {
@@ -35,7 +35,7 @@ public class OpenTSDBBatchCreator implements 
BatchCreator<OpenTSDBSubScan> {
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
OpenTSDBSubScan subScan,
                                        List<RecordBatch> children) throws 
ExecutionSetupException {
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     List<SchemaPath> columns;
 
     for (OpenTSDBSubScan.OpenTSDBSubScanSpec scanSpec : 
subScan.getTabletScanSpecList()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index f6a4863..a937c7a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -238,6 +238,9 @@ public class ScanBatch implements CloseableRecordBatch {
       return false;
     }
     currentReader = readers.next();
+    if (readers.hasNext()) {
+      readers.remove();
+    }
     implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
     currentReader.setup(oContext, mutator);
     currentReaderClassName = currentReader.getClass().getSimpleName();

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d407ec3..678569f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs.easy;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -143,7 +144,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
       throw new ExecutionSetupException(String.format("Failed to create 
FileSystem: %s", e.getMessage()), e);
     }
 
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = Lists.newArrayList();
     Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
     for(FileWork work : scan.getWorkUnits()){

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 01a102d..ab79b2b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mock;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,7 +30,6 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.mock.MockTableDef.MockScanEntry;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
   @Override
@@ -37,7 +37,7 @@ public class MockScanBatchCreator implements 
BatchCreator<MockSubScanPOP> {
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     final List<MockScanEntry> entries = config.getReadEntries();
-    final List<RecordReader> readers = Lists.newArrayList();
+    final List<RecordReader> readers = new LinkedList<>();
     for(final MockTableDef.MockScanEntry e : entries) {
       if ( e.isExtended( ) ) {
         readers.add(new ExtendedMockRecordReader(e));

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 62948b3..e928ebb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,7 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
 
     // keep footers in a map to avoid re-reading them
     Map<String, ParquetMetadata> footers = Maps.newHashMap();
-    List<RecordReader> readers = Lists.newArrayList();
+    List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = Lists.newArrayList();
     Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
     for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index e74c621..b095a9a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -77,7 +77,6 @@ public class DrillParquetReader extends AbstractRecordReader {
   private MessageType schema;
   private DrillFileSystem fileSystem;
   private RowGroupReadEntry entry;
-  private VectorContainerWriter writer;
   private ColumnChunkIncReadStore pageReadStore;
   private RecordReader<Void> recordReader;
   private DrillParquetRecordMaterializer recordMaterializer;
@@ -253,12 +252,10 @@ public class DrillParquetReader extends 
AbstractRecordReader {
       }
 
       if (!noColumnsFound) {
-        writer = new VectorContainerWriter(output);
         // Discard the columns not found in the schema when create 
DrillParquetRecordMaterializer, since they have been added to output already.
         @SuppressWarnings("unchecked")
         final Collection<SchemaPath> columns = columnsNotFound == null || 
columnsNotFound.size() == 0 ? getColumns(): 
CollectionUtils.subtract(getColumns(), columnsNotFound);
-        recordMaterializer = new DrillParquetRecordMaterializer(output, 
writer, projection, columns,
-            fragmentContext.getOptions(), containsCorruptedDates);
+        recordMaterializer = new DrillParquetRecordMaterializer(output, 
projection, columns, fragmentContext.getOptions(), containsCorruptedDates);
         recordReader = columnIO.getRecordReader(pageReadStore, 
recordMaterializer);
       }
     } catch (Exception e) {
@@ -309,7 +306,7 @@ public class DrillParquetReader extends 
AbstractRecordReader {
       count++;
       totalRead++;
     }
-    writer.setValueCount(count);
+    recordMaterializer.setValueCount(count);
     // if we have requested columns that were not found in the file fill their 
vectors with null
     // (by simply setting the value counts inside of them, as they start null 
filled)
     if (nullFilledVectors != null) {
@@ -322,6 +319,13 @@ public class DrillParquetReader extends 
AbstractRecordReader {
 
   @Override
   public void close() {
+    footer = null;
+    fileSystem = null;
+    entry = null;
+    recordReader = null;
+    recordMaterializer = null;
+    nullFilledVectors = null;
+    columnsNotFound = null;
     try {
       if (pageReadStore != null) {
         pageReadStore.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index f6ffdbd..784056c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.RecordMaterializer;
@@ -30,18 +31,22 @@ import org.apache.parquet.schema.MessageType;
 
 public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
 
-  public DrillParquetGroupConverter root;
-  private ComplexWriter complexWriter;
+  private final DrillParquetGroupConverter root;
+  private final ComplexWriter writer;
 
-  public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter 
complexWriter, MessageType schema,
+  public DrillParquetRecordMaterializer(OutputMutator mutator, MessageType 
schema,
                                         Collection<SchemaPath> columns, 
OptionManager options,
                                         
ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
-    this.complexWriter = complexWriter;
-    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), 
schema, columns, options, containsCorruptedDates);
+    writer = new VectorContainerWriter(mutator);
+    root = new DrillParquetGroupConverter(mutator, writer.rootAsMap(), schema, 
columns, options, containsCorruptedDates);
   }
 
   public void setPosition(int position) {
-    complexWriter.setPosition(position);
+    writer.setPosition(position);
+  }
+
+  public void setValueCount(int count) {
+    writer.setValueCount(count);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 7eafb86..c1f2ed5 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -398,7 +399,7 @@ public class MiniPlanUnitTestBase extends 
PhysicalOpUnitTestBase {
         readers = getJsonReadersFromInputFiles(fs, inputPaths, fragContext, 
columnsToRead);
       }
 
-      List<RecordReader> readerList = new ArrayList<>();
+      List<RecordReader> readerList = new LinkedList<>();
       while(readers.hasNext()) {
         readerList.add(readers.next());
       }
@@ -441,7 +442,7 @@ public class MiniPlanUnitTestBase extends 
PhysicalOpUnitTestBase {
     }
 
     private RecordBatch getScanBatch() throws Exception {
-      List<RecordReader> readers = Lists.newArrayList();
+      List<RecordReader> readers = new LinkedList<>();
 
       for (String path : inputPaths) {
         ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), 
new Path(path));

http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index d1ad990..8f644fa 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -80,6 +80,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -481,7 +482,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 
   public List<RecordReader> getReaderListForJsonBatches(List<String> 
jsonBatches, FragmentContext fragContext) {
     Iterator<RecordReader> readers = 
getRecordReadersForJsonBatches(jsonBatches, fragContext);
-    List<RecordReader> readerList = new ArrayList<>();
+    List<RecordReader> readerList = new LinkedList<>();
     while(readers.hasNext()) {
       readerList.add(readers.next());
     }

Reply via email to