DRILL-6164: Heap memory leak during parquet scan and OOM closes #1122
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/894c0f58 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/894c0f58 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/894c0f58 Branch: refs/heads/master Commit: 894c0f58ecff11ce53ede696f2a82308ca8333d4 Parents: 24a7acd Author: Vlad Rozov <vro...@apache.org> Authored: Thu Feb 15 19:25:21 2018 -0800 Committer: Vitalii Diravka <vitalii.dira...@gmail.com> Committed: Mon Feb 19 08:57:05 2018 +0000 ---------------------------------------------------------------------- .../exec/store/mapr/db/MapRDBScanBatchCreator.java | 4 ++-- .../exec/store/hbase/HBaseScanBatchCreator.java | 4 ++-- .../hive/HiveDrillNativeScanBatchCreator.java | 3 ++- .../initilializers/DefaultReadersInitializer.java | 4 ++-- .../exec/store/kafka/KafkaScanBatchCreator.java | 4 ++-- .../exec/store/kudu/KuduScanBatchCreator.java | 4 ++-- .../exec/store/mongo/MongoScanBatchCreator.java | 4 ++-- .../exec/store/openTSDB/OpenTSDBBatchCreator.java | 4 ++-- .../apache/drill/exec/physical/impl/ScanBatch.java | 3 +++ .../exec/store/dfs/easy/EasyFormatPlugin.java | 3 ++- .../exec/store/mock/MockScanBatchCreator.java | 4 ++-- .../store/parquet/ParquetScanBatchCreator.java | 3 ++- .../exec/store/parquet2/DrillParquetReader.java | 14 +++++++++----- .../parquet2/DrillParquetRecordMaterializer.java | 17 +++++++++++------ .../exec/physical/unit/MiniPlanUnitTestBase.java | 5 +++-- .../exec/physical/unit/PhysicalOpUnitTestBase.java | 3 ++- 16 files changed, 50 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java index d9c8ce7..c15d9ba 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mapr.db; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -31,13 +32,12 @@ import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> { @Override public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ try { if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) { http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java index ff9a4e4..87c64ac 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.hbase; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -29,14 +30,13 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan> { @Override public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); List<SchemaPath> columns = null; for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ try { http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java index 243d781..3861aa0 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.regex.Matcher; @@ -95,7 +96,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa final OperatorContext oContext = context.newOperatorContext(config); int currentPartitionIndex = 0; - final List<RecordReader> readers = Lists.newArrayList(); + final List<RecordReader> readers = new LinkedList<>(); final HiveConf conf = config.getHiveConf(); http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java index a161151..52f394f 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java @@ -26,7 +26,7 @@ import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersIn import org.apache.hadoop.mapred.InputSplit; import java.lang.reflect.Constructor; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; /** @@ -44,7 +44,7 @@ public class DefaultReadersInitializer extends AbstractReadersInitializer { List<HivePartition> partitions = config.getPartitions(); boolean hasPartitions = partitions != null && !partitions.isEmpty(); - List<RecordReader> readers = new ArrayList<>(inputSplits.size()); + List<RecordReader> readers = new LinkedList<>(); Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor(); for (int i = 0 ; i < inputSplits.size(); i++) { readers.add(createReader(readerConstructor, hasPartitions ? partitions.get(i) : null, inputSplits.get(i))); http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java index 55fc28f..9bedbd5 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.kafka; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -32,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> { static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class); @@ -43,7 +43,7 @@ public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> { Preconditions.checkArgument(children.isEmpty()); List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS; - List<RecordReader> readers = Lists.newArrayListWithCapacity(subScan.getPartitionSubScanSpecList().size()); + List<RecordReader> readers = new LinkedList<>(); for (KafkaSubScan.KafkaSubScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) { readers.add(new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin())); } http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java index 5e9d86c..7d09d10 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.kudu; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -29,7 +30,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class); @@ -38,7 +38,7 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{ public ScanBatch getBatch(ExecutorFragmentContext context, KuduSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); List<SchemaPath> columns = null; for (KuduSubScan.KuduSubScanSpec scanSpec : subScan.getTabletScanSpecList()) { http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java index 42717fc..c970cfd 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mongo; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -31,7 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> { static final Logger logger = LoggerFactory @@ -41,7 +41,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> { public ScanBatch getBatch(ExecutorFragmentContext context, MongoSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); List<SchemaPath> columns = null; for (MongoSubScan.MongoSubScanSpec scanSpec : subScan .getChunkScanSpecList()) { http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java index fce2d7a..6bd059d 100644 --- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.openTSDB; -import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.ExecutorFragmentContext; @@ -28,6 +27,7 @@ import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; +import java.util.LinkedList; import java.util.List; public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> { @@ -35,7 +35,7 @@ public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> { @Override public CloseableRecordBatch getBatch(ExecutorFragmentContext context, OpenTSDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); List<SchemaPath> columns; for (OpenTSDBSubScan.OpenTSDBSubScanSpec scanSpec : subScan.getTabletScanSpecList()) { http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index f6a4863..a937c7a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -238,6 +238,9 @@ public class ScanBatch implements CloseableRecordBatch { return false; } currentReader = readers.next(); + if (readers.hasNext()) { + readers.remove(); + } implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; currentReader.setup(oContext, mutator); currentReaderClassName = currentReader.getClass().getSimpleName(); http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index d407ec3..678569f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -143,7 +144,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); } - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); List<Map<String, String>> implicitColumns = Lists.newArrayList(); Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); for(FileWork work : scan.getWorkUnits()){ http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java index 01a102d..ab79b2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mock; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -29,7 +30,6 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.mock.MockTableDef.MockScanEntry; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> { @Override @@ -37,7 +37,7 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> { throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); final List<MockScanEntry> entries = config.getReadEntries(); - final List<RecordReader> readers = Lists.newArrayList(); + final List<RecordReader> readers = new LinkedList<>(); for(final MockTableDef.MockScanEntry e : entries) { if ( e.isExtended( ) ) { readers.add(new ExtendedMockRecordReader(e)); http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 62948b3..e928ebb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan // keep footers in a map to avoid re-reading them Map<String, ParquetMetadata> footers = Maps.newHashMap(); - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); List<Map<String, String>> implicitColumns = Lists.newArrayList(); Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){ http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index e74c621..b095a9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -77,7 +77,6 @@ public class DrillParquetReader extends AbstractRecordReader { private MessageType schema; private DrillFileSystem fileSystem; private RowGroupReadEntry entry; - private VectorContainerWriter writer; private ColumnChunkIncReadStore pageReadStore; private RecordReader<Void> recordReader; private DrillParquetRecordMaterializer recordMaterializer; @@ -253,12 +252,10 @@ public class DrillParquetReader extends AbstractRecordReader { } if (!noColumnsFound) { - writer = new VectorContainerWriter(output); // Discard the columns not found in the schema when create DrillParquetRecordMaterializer, since they have been added to output already. @SuppressWarnings("unchecked") final Collection<SchemaPath> columns = columnsNotFound == null || columnsNotFound.size() == 0 ? getColumns(): CollectionUtils.subtract(getColumns(), columnsNotFound); - recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, columns, - fragmentContext.getOptions(), containsCorruptedDates); + recordMaterializer = new DrillParquetRecordMaterializer(output, projection, columns, fragmentContext.getOptions(), containsCorruptedDates); recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer); } } catch (Exception e) { @@ -309,7 +306,7 @@ public class DrillParquetReader extends AbstractRecordReader { count++; totalRead++; } - writer.setValueCount(count); + recordMaterializer.setValueCount(count); // if we have requested columns that were not found in the file fill their vectors with null // (by simply setting the value counts inside of them, as they start null filled) if (nullFilledVectors != null) { @@ -322,6 +319,13 @@ public class DrillParquetReader extends AbstractRecordReader { @Override public void close() { + footer = null; + fileSystem = null; + entry = null; + recordReader = null; + recordMaterializer = null; + nullFilledVectors = null; + columnsNotFound = null; try { if (pageReadStore != null) { pageReadStore.close(); http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index f6ffdbd..784056c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; @@ -30,18 +31,22 @@ import org.apache.parquet.schema.MessageType; public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> { - public DrillParquetGroupConverter root; - private ComplexWriter complexWriter; + private final DrillParquetGroupConverter root; + private final ComplexWriter writer; - public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema, + public DrillParquetRecordMaterializer(OutputMutator mutator, MessageType schema, Collection<SchemaPath> columns, OptionManager options, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { - this.complexWriter = complexWriter; - root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns, options, containsCorruptedDates); + writer = new VectorContainerWriter(mutator); + root = new DrillParquetGroupConverter(mutator, writer.rootAsMap(), schema, columns, options, containsCorruptedDates); } public void setPosition(int position) { - complexWriter.setPosition(position); + writer.setPosition(position); + } + + public void setValueCount(int count) { + writer.setValueCount(count); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index 7eafb86..c1f2ed5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -398,7 +399,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { readers = getJsonReadersFromInputFiles(fs, inputPaths, fragContext, columnsToRead); } - List<RecordReader> readerList = new ArrayList<>(); + List<RecordReader> readerList = new LinkedList<>(); while(readers.hasNext()) { readerList.add(readers.next()); } @@ -441,7 +442,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { } private RecordBatch getScanBatch() throws Exception { - List<RecordReader> readers = Lists.newArrayList(); + List<RecordReader> readers = new LinkedList<>(); for (String path : inputPaths) { ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path)); http://git-wip-us.apache.org/repos/asf/drill/blob/894c0f58/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java index d1ad990..8f644fa 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -80,6 +80,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -481,7 +482,7 @@ public class PhysicalOpUnitTestBase extends ExecTest { public List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) { Iterator<RecordReader> readers = getRecordReadersForJsonBatches(jsonBatches, fragContext); - List<RecordReader> readerList = new ArrayList<>(); + List<RecordReader> readerList = new LinkedList<>(); while(readers.hasNext()) { readerList.add(readers.next()); }