[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/1214


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-26 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r184401600
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java 
---
@@ -158,25 +159,26 @@ public void close() {
 } catch (RuntimeException e) {
   ex = ex == null ? e : ex;
 }
-try {
-  if (fs != null) {
+
+for (DrillFileSystem fs : fileSystems) {
+  try {
 fs.close();
-fs = null;
-  }
-} catch (IOException e) {
+  } catch (IOException e) {
   throw UserException.resourceError(e)
-.addContext("Failed to close the Drill file system for " + 
getName())
-.build(logger);
+  .addContext("Failed to close the Drill file system for " + 
getName())
+  .build(logger);
+  }
 }
+
 if (ex != null) {
   throw ex;
 }
   }
 
   @Override
   public DrillFileSystem newFileSystem(Configuration conf) throws 
IOException {
-Preconditions.checkState(fs == null, "Tried to create a second 
FileSystem. Can only be called once per OperatorContext");
-fs = new DrillFileSystem(conf, getStats());
+DrillFileSystem fs = new DrillFileSystem(conf, getStats());
--- End diff --

When `AbstractParquetScanBatchCreator.getBatch` method is called, it 
receives one operator context which is used to allow to create only one file 
system. It also receives `AbstractParquetRowGroupScan` which contains several 
row groups. Row groups may belong to different files. For Drill parquet files, 
we create only one fs and use it for to create readers for each row group. 
That's why it was fine when operator context allowed to create only one fs. But 
we needed to adjust it for Hive files. For Hive we need to create fs for each 
file (since config to each file system is different and created using 
projection pusher), that's why I had to change operator context to allow more 
then one file system. I have also introduced `AbstractDrillFileSystemManager` 
which controls number of file systems created. `ParquetDrillFileSystemManager` 
creates only one (as was done before). 
`HiveDrillNativeParquetDrillFileSystemManager` creates fs for each file, so 
when two row groups belong to the same
  file, they will get the same fs.

But I agree that for tracking fs (i.e. 
store.parquet.reader.pagereader.async is set to false) this will create mess in 
calculations. So I suggest the following fix, for Hive we'll always create non 
tracking fs, for Drill depending on store.parquet.reader.pagereader.async 
option. Also I'll add checks in operator context to disallow to create more 
then one tracking fs and to create tracking fs at all when non-tracking is / 
are already created.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r184199682
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java 
---
@@ -158,25 +159,26 @@ public void close() {
 } catch (RuntimeException e) {
   ex = ex == null ? e : ex;
 }
-try {
-  if (fs != null) {
+
+for (DrillFileSystem fs : fileSystems) {
+  try {
 fs.close();
-fs = null;
-  }
-} catch (IOException e) {
+  } catch (IOException e) {
   throw UserException.resourceError(e)
-.addContext("Failed to close the Drill file system for " + 
getName())
-.build(logger);
+  .addContext("Failed to close the Drill file system for " + 
getName())
+  .build(logger);
+  }
 }
+
 if (ex != null) {
   throw ex;
 }
   }
 
   @Override
   public DrillFileSystem newFileSystem(Configuration conf) throws 
IOException {
-Preconditions.checkState(fs == null, "Tried to create a second 
FileSystem. Can only be called once per OperatorContext");
-fs = new DrillFileSystem(conf, getStats());
+DrillFileSystem fs = new DrillFileSystem(conf, getStats());
--- End diff --

I'm not suggesting we use the same fs for each split, but the opposite. The 
fs obect used per split/rowgroup should be different so that we get the right 
fs wait time for every minor fragment. But this change allows more than one fs 
object per operator context; which we were explicitly preventing earlier. I'm 
not sure I understand why you needed to change that.




---



[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r184188427
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java 
---
@@ -158,25 +159,26 @@ public void close() {
 } catch (RuntimeException e) {
   ex = ex == null ? e : ex;
 }
-try {
-  if (fs != null) {
+
+for (DrillFileSystem fs : fileSystems) {
+  try {
 fs.close();
-fs = null;
-  }
-} catch (IOException e) {
+  } catch (IOException e) {
   throw UserException.resourceError(e)
-.addContext("Failed to close the Drill file system for " + 
getName())
-.build(logger);
+  .addContext("Failed to close the Drill file system for " + 
getName())
+  .build(logger);
+  }
 }
+
 if (ex != null) {
   throw ex;
 }
   }
 
   @Override
   public DrillFileSystem newFileSystem(Configuration conf) throws 
IOException {
-Preconditions.checkState(fs == null, "Tried to create a second 
FileSystem. Can only be called once per OperatorContext");
-fs = new DrillFileSystem(conf, getStats());
+DrillFileSystem fs = new DrillFileSystem(conf, getStats());
--- End diff --

@parthchandra this is definitely a good question. I did so because in 
previous code new fs was created for each Hive table split [1]. Projection 
pusher is used to define fs for each split, it resolves path for table 
partitions. Frankly saying it worked fine for me without it (all tests have 
passed) but in Hive code the same approach is used and apparently for the same 
reasons it was used in Drill. To be safe, I have done the same. If you think we 
can the same fs for each row group in Hive, then I can adjust the changes.

[1] 
https://github.com/apache/drill/blob/master/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java#L112


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183909850
  
--- Diff: common/src/main/java/org/apache/drill/common/Stopwatch.java ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import com.google.common.base.Ticker;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper that creates stopwatch based if debug level is enabled.
--- End diff --

Do we really need this? In general we have (or should have) used Stopwatch 
to track metrics and or performance bottlenecks in production. In neither case 
do we want to enable debug.
Also, for debugging performance issues (I see that the places you've 
changed to use this Stopwatch are places where we encountered performance 
issues), would it be better to use 
```
Stopwatch timer;
if(logger.isDebugEnabled()){
   timer = Stopwatch.createStarted();
}
```
More verbose, but guaranteed to be optimized away by the JVM.
Not insisting that we change this, BTW.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183919198
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java 
---
@@ -158,25 +159,26 @@ public void close() {
 } catch (RuntimeException e) {
   ex = ex == null ? e : ex;
 }
-try {
-  if (fs != null) {
+
+for (DrillFileSystem fs : fileSystems) {
+  try {
 fs.close();
-fs = null;
-  }
-} catch (IOException e) {
+  } catch (IOException e) {
   throw UserException.resourceError(e)
-.addContext("Failed to close the Drill file system for " + 
getName())
-.build(logger);
+  .addContext("Failed to close the Drill file system for " + 
getName())
+  .build(logger);
+  }
 }
+
 if (ex != null) {
   throw ex;
 }
   }
 
   @Override
   public DrillFileSystem newFileSystem(Configuration conf) throws 
IOException {
-Preconditions.checkState(fs == null, "Tried to create a second 
FileSystem. Can only be called once per OperatorContext");
-fs = new DrillFileSystem(conf, getStats());
+DrillFileSystem fs = new DrillFileSystem(conf, getStats());
--- End diff --

I don't get why you need multiple DrillFileSystems per operator context? 
The reason for the DrillFileSystem abstraction (and the reason for tying it to 
the operator context) is to track the time a (scan) operator was waiting for a 
file system call to return. This is reported in the wait time for the operator 
in the query profile.  For scans this is a critical number as the time spent 
waiting for a disk read determines if the query is disk bound.
Associating multiple file system objects with a single operator context 
will throw the math out of whack. I think.



---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981425
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.Stopwatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import 
org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractParquetScanBatchCreator {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
+
+  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+
+  protected ScanBatch getBatch(ExecutorFragmentContext context, 
AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws 
ExecutionSetupException {
+final ColumnExplorer columnExplorer = new 
ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
+
+if (!columnExplorer.isStarQuery()) {
+  rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
+  rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
+}
+
+boolean useAsyncPageReader =
+
context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+
+AbstractDrillFileSystemManager fsManager = 
getDrillFileSystemCreator(oContext, useAsyncPageReader);
+
+// keep footers in a map to avoid re-reading them
+Map footers = new HashMap<>();
+List readers = new LinkedList<>();
+List> implicitColumns = new ArrayList<>();
+Map mapWithMaxColumns = new LinkedHashMap<>();
+for(RowGroupReadEntry rowGroup : 
rowGroupScan.getRowGroupReadEntries()) {
+  /*
+  Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
+  TODO - to prevent reading the footer again in the parquet record 
reader (it is read earlier in the ParquetStorageEngine)
+  we should add more information to the RowGroupInfo that will be 
populated upon the first read to
+  provide the reader with all of th file meta-data it needs
+  These fields will be added to the constructor below
+  */
+  try {
+Stopwatch timer = 

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981543
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
 ---
@@ -40,31 +36,26 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
 
 // Class containing information for reading a single parquet row group 
form HDFS
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981479
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
+
+/**
+ * Holds common statistics about data in parquet group scan,
+ * including information about total row count, columns counts, partition 
columns.
+ */
+public class ParquetGroupScanStatistics {
+
+  // map from file names to maps of column name to partition value mappings
+  private Map> partitionValueMap;
+  // only for partition columns : value is unique for each partition
+  private Map partitionColTypeMap;
+  // total number of non-null value for each column in parquet files
+  private Map columnValueCounts;
+  // total number of rows (obtained from parquet footer)
+  private long rowCount;
+
+
+  public ParquetGroupScanStatistics(List rowGroupInfos, 
ParquetTableMetadataBase parquetTableMetadata) {
+collect(rowGroupInfos, parquetTableMetadata);
+  }
+
+  public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) {
+this.partitionValueMap = new HashMap<>(that.partitionValueMap);
+this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap);
+this.columnValueCounts = new HashMap<>(that.columnValueCounts);
+this.rowCount = that.rowCount;
+  }
+
+  public long getColumnValueCount(SchemaPath column) {
+return columnValueCounts.containsKey(column) ? 
columnValueCounts.get(column) : 0;
+  }
+
+  public List getPartitionColumns() {
+return new ArrayList<>(partitionColTypeMap.keySet());
+  }
+
+  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
+return partitionColTypeMap.get(schemaPath);
+  }
+
+  public long getRowCount() {
+return rowCount;
+  }
+
+  public Object getPartitionValue(String path, SchemaPath column) {
+return partitionValueMap.get(path).get(column);
+  }
+
+  public void collect(List rowGroupInfos, 
ParquetTableMetadataBase parquetTableMetadata) {
+resetHolders();
+boolean first = true;
+for (RowGroupInfo rowGroup : rowGroupInfos) {
+  long rowCount = rowGroup.getRowCount();
+  for (ColumnMetadata column : rowGroup.getColumns()) {
+SchemaPath schemaPath = 
SchemaPath.getCompoundPath(column.getName());
+Long previousCount = columnValueCounts.get(schemaPath);
+if (previousCount != null) {
+  if (previousCount != GroupScan.NO_COLUMN_STATS) {
+if (column.getNulls() != null) {
--- End diff --

Changed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981672
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
 ---
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183983380
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+
+public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
+
+  protected List columns;
+  protected List entries;
+  protected LogicalExpression filter;
+
+  protected ParquetTableMetadataBase parquetTableMetadata;
+  protected List rowGroupInfos;
+  protected ListMultimap mappings;
+  protected Set fileSet;
+
+  private List endpointAffinities;
+  private ParquetGroupScanStatistics parquetGroupScanStatistics;
+
+  protected AbstractParquetGroupScan(String userName, List 
columns, List entries, LogicalExpression filter) {
+super(userName);
+this.columns = columns;
+this.entries = entries;
+this.filter = filter;
+  }
+
+  // immutable copy constructor
+  

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981301
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 ---
@@ -147,10 +147,12 @@ CloseableRecordBatch getReaderBatch(FragmentContext 
context, EasySubScan scan) t
 List readers = new LinkedList<>();
 List> implicitColumns = Lists.newArrayList();
 Map mapWithMaxColumns = Maps.newLinkedHashMap();
+boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
 for(FileWork work : scan.getWorkUnits()){
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183982099
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
 ---
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+
+import java.util.List;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
+
+public class RowGroupInfo extends ReadEntryFromHDFS implements 
CompleteWork, FileWork {
+
+private EndpointByteMap byteMap;
+private int rowGroupIndex;
+private List columns;
+private long rowCount;  // rowCount = -1 indicates to include all rows.
+private long numRecordsToRead;
+
+@JsonCreator
+public RowGroupInfo(@JsonProperty("path") String path, 
@JsonProperty("start") long start,
+@JsonProperty("length") long length, 
@JsonProperty("rowGroupIndex") int rowGroupIndex, long rowCount) {
+  super(path, start, length);
+  this.rowGroupIndex = rowGroupIndex;
+  this.rowCount = rowCount;
+  this.numRecordsToRead = rowCount;
+}
+
+public RowGroupReadEntry getRowGroupReadEntry() {
+  return new RowGroupReadEntry(this.getPath(), this.getStart(), 
this.getLength(),
+   this.rowGroupIndex, 
this.getNumRecordsToRead());
+}
+
+public int getRowGroupIndex() {
+  return this.rowGroupIndex;
+}
+
+@Override
+public int compareTo(CompleteWork o) {
+  return Long.compare(getTotalBytes(), o.getTotalBytes());
+}
+
+@Override
+public long getTotalBytes() {
+  return this.getLength();
+}
+
+@Override
+public EndpointByteMap getByteMap() {
+  return byteMap;
+}
+
+public long getNumRecordsToRead() {
+  return numRecordsToRead;
+}
+
+public void setNumRecordsToRead(long numRecords) {
+  numRecordsToRead = numRecords;
+}
+
+public void setEndpointByteMap(EndpointByteMap byteMap) {
+  this.byteMap = byteMap;
+}
+
+public long getRowCount() {
+  return rowCount;
+}
+
+public List getColumns() {
+  return columns;
+}
+
+public void setColumns(List columns) {
+  this.columns = columns;
+}
+
+  }
--- End diff --

Done.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981138
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 ---
@@ -64,16 +68,17 @@ public static synchronized HiveTestDataGenerator 
getInstance(File baseDir) throw
   final String dbDir = dbDirFile.getAbsolutePath();
   final String whDir = whDirFile.getAbsolutePath();
 
-  instance = new HiveTestDataGenerator(dbDir, whDir);
+  instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
   instance.generateTestData();
 }
 
 return instance;
   }
 
-  private HiveTestDataGenerator(final String dbDir, final String whDir) {
+  private HiveTestDataGenerator(final String dbDir, final String whDir, 
final BaseDirTestWatcher dirTestWatcher) {
 this.dbDir = dbDir;
 this.whDir = whDir;
+this.dirTestWatcher = dirTestWatcher;
 
 config = Maps.newHashMap();
 config.put("hive.metastore.uris", "");
--- End diff --

Replaced.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183980928
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 ---
@@ -0,0 +1,247 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveDrillNativeParquetReader extends HiveTestBase {
+
+  @BeforeClass
+  public static void init() {
+setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, 
true);
+setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+
resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
+resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFilterPushDownForManagedTable() throws Exception {
+String query = "select * from hive.kv_native where key > 1";
+
+int actualRowCount = testSql(query);
+assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+testPlanMatchingPatterns(query,
+new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new 
String[]{});
+  }
+
+  @Test
+  public void testFilterPushDownForExternalTable() throws Exception {
+String query = "select * from hive.kv_native_ext where key = 1";
+
+int actualRowCount = testSql(query);
+assertEquals("Expected and actual row count should match", 1, 
actualRowCount);
+
+testPlanMatchingPatterns(query,
+new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new 
String[]{});
--- End diff --

Agree, replaced with null. Not adding this method to avoid merging 
conflicts.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183983280
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+
+public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
+
+  protected List columns;
+  protected List entries;
+  protected LogicalExpression filter;
+
+  protected ParquetTableMetadataBase parquetTableMetadata;
+  protected List rowGroupInfos;
+  protected ListMultimap mappings;
+  protected Set fileSet;
+
+  private List endpointAffinities;
+  private ParquetGroupScanStatistics parquetGroupScanStatistics;
+
+  protected AbstractParquetGroupScan(String userName, List 
columns, List entries, LogicalExpression filter) {
+super(userName);
+this.columns = columns;
+this.entries = entries;
+this.filter = filter;
+  }
+
+  // immutable copy constructor
+  

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183980648
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 ---
@@ -0,0 +1,247 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183980574
  
--- Diff: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 ---
@@ -1,114 +1,223 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183980539
  
--- Diff: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
 ---
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981250
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---
@@ -156,43 +157,74 @@ public static boolean isPartitionColumn(String 
partitionDesignator, String path)
   }
 
   /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
+   * Creates map with implicit columns where key is column name, value is 
columns actual value.
+   * This map contains partition and implicit file columns (if requested).
+   * Partition columns names are formed based in partition designator and 
value index.
*
-   * @return map with columns names as keys and their values
+   * @param filePath file path, used to populate file implicit columns
+   * @param partitionValues list of partition values
+   * @param includeFileImplicitColumns if file implicit columns should be 
included into the result
+   * @return implicit columns map
*/
-  public Map populateImplicitColumns(FileWork work, String 
selectionRoot) {
-return populateImplicitColumns(work.getPath(), selectionRoot);
-  }
+  public Map populateImplicitColumns(String filePath,
+ List 
partitionValues,
+ boolean 
includeFileImplicitColumns) {
+Map implicitValues = new LinkedHashMap<>();
 
-  /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
-   *
-   * @return map with columns names as keys and their values
-   */
-  public Map populateImplicitColumns(String filePath, 
String selectionRoot) {
-Map implicitValues = Maps.newLinkedHashMap();
-if (selectionRoot != null) {
-  String[] r = Path.getPathWithoutSchemeAndAuthority(new 
Path(selectionRoot)).toString().split("/");
-  Path path = Path.getPathWithoutSchemeAndAuthority(new 
Path(filePath));
-  String[] p = path.toString().split("/");
-  if (p.length > r.length) {
-String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
-for (int a = 0; a < q.length; a++) {
-  if (isStarQuery || selectedPartitionColumns.contains(a)) {
-implicitValues.put(partitionDesignator + a, q[a]);
-  }
-}
+for(int i = 0; i < partitionValues.size(); i++) {
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183981354
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.Stopwatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import 
org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractParquetScanBatchCreator {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
+
+  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+
+  protected ScanBatch getBatch(ExecutorFragmentContext context, 
AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws 
ExecutionSetupException {
+final ColumnExplorer columnExplorer = new 
ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
+
+if (!columnExplorer.isStarQuery()) {
+  rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
+  rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
+}
+
+boolean useAsyncPageReader =
+
context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+
+AbstractDrillFileSystemManager fsManager = 
getDrillFileSystemCreator(oContext, useAsyncPageReader);
+
+// keep footers in a map to avoid re-reading them
+Map footers = new HashMap<>();
+List readers = new LinkedList<>();
+List> implicitColumns = new ArrayList<>();
+Map mapWithMaxColumns = new LinkedHashMap<>();
+for(RowGroupReadEntry rowGroup : 
rowGroupScan.getRowGroupReadEntries()) {
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-25 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183980496
  
--- Diff: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
 ---
@@ -166,25 +171,43 @@ public boolean matches(RelOptRuleCall call) {
   @Override
   public void onMatch(RelOptRuleCall call) {
 try {
-  final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+  final DrillScanRel hiveScanRel = call.rel(0);
   final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
 
   final PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
   final String partitionColumnLabel = 
settings.getFsPartitionColumnLabel();
 
   final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
-  checkForUnsupportedDataTypes(hiveTable);
+  final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
+
+  final HiveMetadataProvider hiveMetadataProvider = new 
HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, 
hiveScan.getStoragePlugin().getHiveConf());
--- End diff --

Fixed.


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183635975
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.Stopwatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import 
org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractParquetScanBatchCreator {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
+
+  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+
+  protected ScanBatch getBatch(ExecutorFragmentContext context, 
AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws 
ExecutionSetupException {
+final ColumnExplorer columnExplorer = new 
ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
+
+if (!columnExplorer.isStarQuery()) {
+  rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
+  rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
+}
+
+boolean useAsyncPageReader =
+
context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+
+AbstractDrillFileSystemManager fsManager = 
getDrillFileSystemCreator(oContext, useAsyncPageReader);
+
+// keep footers in a map to avoid re-reading them
+Map footers = new HashMap<>();
+List readers = new LinkedList<>();
+List> implicitColumns = new ArrayList<>();
+Map mapWithMaxColumns = new LinkedHashMap<>();
+for(RowGroupReadEntry rowGroup : 
rowGroupScan.getRowGroupReadEntries()) {
+  /*
+  Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
+  TODO - to prevent reading the footer again in the parquet record 
reader (it is read earlier in the ParquetStorageEngine)
+  we should add more information to the RowGroupInfo that will be 
populated upon the first read to
+  provide the reader with all of th file meta-data it needs
+  These fields will be added to the constructor below
+  */
+  try {
+Stopwatch timer = 

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183558185
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+
+public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
+
+  protected List columns;
+  protected List entries;
+  protected LogicalExpression filter;
+
+  protected ParquetTableMetadataBase parquetTableMetadata;
+  protected List rowGroupInfos;
+  protected ListMultimap mappings;
+  protected Set fileSet;
+
+  private List endpointAffinities;
+  private ParquetGroupScanStatistics parquetGroupScanStatistics;
+
+  protected AbstractParquetGroupScan(String userName, List 
columns, List entries, LogicalExpression filter) {
+super(userName);
+this.columns = columns;
+this.entries = entries;
+this.filter = filter;
+  }
+
+  // immutable copy constructor
+  protected 

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183251213
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
 ---
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+
+import java.util.List;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
+
+public class RowGroupInfo extends ReadEntryFromHDFS implements 
CompleteWork, FileWork {
+
+private EndpointByteMap byteMap;
+private int rowGroupIndex;
+private List columns;
+private long rowCount;  // rowCount = -1 indicates to include all rows.
+private long numRecordsToRead;
+
+@JsonCreator
+public RowGroupInfo(@JsonProperty("path") String path, 
@JsonProperty("start") long start,
+@JsonProperty("length") long length, 
@JsonProperty("rowGroupIndex") int rowGroupIndex, long rowCount) {
+  super(path, start, length);
+  this.rowGroupIndex = rowGroupIndex;
+  this.rowCount = rowCount;
+  this.numRecordsToRead = rowCount;
+}
+
+public RowGroupReadEntry getRowGroupReadEntry() {
+  return new RowGroupReadEntry(this.getPath(), this.getStart(), 
this.getLength(),
+   this.rowGroupIndex, 
this.getNumRecordsToRead());
+}
+
+public int getRowGroupIndex() {
+  return this.rowGroupIndex;
+}
+
+@Override
+public int compareTo(CompleteWork o) {
+  return Long.compare(getTotalBytes(), o.getTotalBytes());
+}
+
+@Override
+public long getTotalBytes() {
+  return this.getLength();
+}
+
+@Override
+public EndpointByteMap getByteMap() {
+  return byteMap;
+}
+
+public long getNumRecordsToRead() {
+  return numRecordsToRead;
+}
+
+public void setNumRecordsToRead(long numRecords) {
+  numRecordsToRead = numRecords;
+}
+
+public void setEndpointByteMap(EndpointByteMap byteMap) {
+  this.byteMap = byteMap;
+}
+
+public long getRowCount() {
+  return rowCount;
+}
+
+public List getColumns() {
+  return columns;
+}
+
+public void setColumns(List columns) {
+  this.columns = columns;
+}
+
+  }
--- End diff --

new line


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183644277
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
+
+/**
+ * Holds common statistics about data in parquet group scan,
+ * including information about total row count, columns counts, partition 
columns.
+ */
+public class ParquetGroupScanStatistics {
+
+  // map from file names to maps of column name to partition value mappings
+  private Map> partitionValueMap;
+  // only for partition columns : value is unique for each partition
+  private Map partitionColTypeMap;
+  // total number of non-null value for each column in parquet files
+  private Map columnValueCounts;
+  // total number of rows (obtained from parquet footer)
+  private long rowCount;
+
+
+  public ParquetGroupScanStatistics(List rowGroupInfos, 
ParquetTableMetadataBase parquetTableMetadata) {
+collect(rowGroupInfos, parquetTableMetadata);
+  }
+
+  public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) {
+this.partitionValueMap = new HashMap<>(that.partitionValueMap);
+this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap);
+this.columnValueCounts = new HashMap<>(that.columnValueCounts);
+this.rowCount = that.rowCount;
+  }
+
+  public long getColumnValueCount(SchemaPath column) {
+return columnValueCounts.containsKey(column) ? 
columnValueCounts.get(column) : 0;
+  }
+
+  public List getPartitionColumns() {
+return new ArrayList<>(partitionColTypeMap.keySet());
+  }
+
+  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
+return partitionColTypeMap.get(schemaPath);
+  }
+
+  public long getRowCount() {
+return rowCount;
+  }
+
+  public Object getPartitionValue(String path, SchemaPath column) {
+return partitionValueMap.get(path).get(column);
+  }
+
+  public void collect(List rowGroupInfos, 
ParquetTableMetadataBase parquetTableMetadata) {
+resetHolders();
+boolean first = true;
+for (RowGroupInfo rowGroup : rowGroupInfos) {
+  long rowCount = rowGroup.getRowCount();
+  for (ColumnMetadata column : rowGroup.getColumns()) {
+SchemaPath schemaPath = 
SchemaPath.getCompoundPath(column.getName());
+Long previousCount = columnValueCounts.get(schemaPath);
+if (previousCount != null) {
+  if (previousCount != GroupScan.NO_COLUMN_STATS) {
+if (column.getNulls() != null) {
--- End diff --

Combine if statement with above.



---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183646149
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 ---
@@ -0,0 +1,247 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveDrillNativeParquetReader extends HiveTestBase {
+
+  @BeforeClass
+  public static void init() {
+setSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, 
true);
+setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+
resetSessionOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS);
+resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFilterPushDownForManagedTable() throws Exception {
+String query = "select * from hive.kv_native where key > 1";
+
+int actualRowCount = testSql(query);
+assertEquals("Expected and actual row count should match", 2, 
actualRowCount);
+
+testPlanMatchingPatterns(query,
+new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new 
String[]{});
+  }
+
+  @Test
+  public void testFilterPushDownForExternalTable() throws Exception {
+String query = "select * from hive.kv_native_ext where key = 1";
+
+int actualRowCount = testSql(query);
+assertEquals("Expected and actual row count should match", 1, 
actualRowCount);
+
+testPlanMatchingPatterns(query,
+new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new 
String[]{});
--- End diff --

I have added method without `excludedPatterns`, when it is not necessary. 
But it is not merged for now.
Is it better to pass null, than to create empty String?


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183253923
  
--- Diff: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
 ---
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

indent


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183566717
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+
+public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
+
+  protected List columns;
+  protected List entries;
+  protected LogicalExpression filter;
+
+  protected ParquetTableMetadataBase parquetTableMetadata;
+  protected List rowGroupInfos;
+  protected ListMultimap mappings;
+  protected Set fileSet;
+
+  private List endpointAffinities;
+  private ParquetGroupScanStatistics parquetGroupScanStatistics;
+
+  protected AbstractParquetGroupScan(String userName, List 
columns, List entries, LogicalExpression filter) {
+super(userName);
+this.columns = columns;
+this.entries = entries;
+this.filter = filter;
+  }
+
+  // immutable copy constructor
+  protected 

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183551693
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+
+public abstract class AbstractParquetGroupScan extends 
AbstractFileGroupScan {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
+
+  protected List columns;
+  protected List entries;
+  protected LogicalExpression filter;
+
+  protected ParquetTableMetadataBase parquetTableMetadata;
+  protected List rowGroupInfos;
+  protected ListMultimap mappings;
+  protected Set fileSet;
+
+  private List endpointAffinities;
+  private ParquetGroupScanStatistics parquetGroupScanStatistics;
+
+  protected AbstractParquetGroupScan(String userName, List 
columns, List entries, LogicalExpression filter) {
+super(userName);
+this.columns = columns;
+this.entries = entries;
+this.filter = filter;
+  }
+
+  // immutable copy constructor
+  protected 

[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183250175
  
--- Diff: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 ---
@@ -1,114 +1,223 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

indent


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183647695
  
--- Diff: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
 ---
@@ -166,25 +171,43 @@ public boolean matches(RelOptRuleCall call) {
   @Override
   public void onMatch(RelOptRuleCall call) {
 try {
-  final DrillScanRel hiveScanRel = (DrillScanRel) call.rel(0);
+  final DrillScanRel hiveScanRel = call.rel(0);
   final HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
 
   final PlannerSettings settings = 
PrelUtil.getPlannerSettings(call.getPlanner());
   final String partitionColumnLabel = 
settings.getFsPartitionColumnLabel();
 
   final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
-  checkForUnsupportedDataTypes(hiveTable);
+  final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
+
+  final HiveMetadataProvider hiveMetadataProvider = new 
HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, 
hiveScan.getStoragePlugin().getHiveConf());
--- End diff --

line break


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183632379
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 ---
@@ -64,16 +68,17 @@ public static synchronized HiveTestDataGenerator 
getInstance(File baseDir) throw
   final String dbDir = dbDirFile.getAbsolutePath();
   final String whDir = whDirFile.getAbsolutePath();
 
-  instance = new HiveTestDataGenerator(dbDir, whDir);
+  instance = new HiveTestDataGenerator(dbDir, whDir, dirTestWatcher);
   instance.generateTestData();
 }
 
 return instance;
   }
 
-  private HiveTestDataGenerator(final String dbDir, final String whDir) {
+  private HiveTestDataGenerator(final String dbDir, final String whDir, 
final BaseDirTestWatcher dirTestWatcher) {
 this.dbDir = dbDir;
 this.whDir = whDir;
+this.dirTestWatcher = dirTestWatcher;
 
 config = Maps.newHashMap();
 config.put("hive.metastore.uris", "");
--- End diff --

"hive.metastore.uris" -> ConfVars.METASTOREURIS


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183633623
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 ---
@@ -147,10 +147,12 @@ CloseableRecordBatch getReaderBatch(FragmentContext 
context, EasySubScan scan) t
 List readers = new LinkedList<>();
 List> implicitColumns = Lists.newArrayList();
 Map mapWithMaxColumns = Maps.newLinkedHashMap();
+boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
 for(FileWork work : scan.getWorkUnits()){
--- End diff --

`for (`


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183644581
  
--- Diff: 
contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 ---
@@ -0,0 +1,247 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

indent


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183253252
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
 ---
@@ -40,31 +36,26 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
 
 // Class containing information for reading a single parquet row group 
form HDFS
--- End diff --

form  - > from


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183633688
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.Stopwatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import 
org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractParquetScanBatchCreator {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
+
+  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+
+  protected ScanBatch getBatch(ExecutorFragmentContext context, 
AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws 
ExecutionSetupException {
+final ColumnExplorer columnExplorer = new 
ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
+
+if (!columnExplorer.isStarQuery()) {
+  rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
+  rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
+}
+
+boolean useAsyncPageReader =
+
context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+
+AbstractDrillFileSystemManager fsManager = 
getDrillFileSystemCreator(oContext, useAsyncPageReader);
+
+// keep footers in a map to avoid re-reading them
+Map footers = new HashMap<>();
+List readers = new LinkedList<>();
+List> implicitColumns = new ArrayList<>();
+Map mapWithMaxColumns = new LinkedHashMap<>();
+for(RowGroupReadEntry rowGroup : 
rowGroupScan.getRowGroupReadEntries()) {
--- End diff --

`for (`


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183633517
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---
@@ -156,43 +157,74 @@ public static boolean isPartitionColumn(String 
partitionDesignator, String path)
   }
 
   /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
+   * Creates map with implicit columns where key is column name, value is 
columns actual value.
+   * This map contains partition and implicit file columns (if requested).
+   * Partition columns names are formed based in partition designator and 
value index.
*
-   * @return map with columns names as keys and their values
+   * @param filePath file path, used to populate file implicit columns
+   * @param partitionValues list of partition values
+   * @param includeFileImplicitColumns if file implicit columns should be 
included into the result
+   * @return implicit columns map
*/
-  public Map populateImplicitColumns(FileWork work, String 
selectionRoot) {
-return populateImplicitColumns(work.getPath(), selectionRoot);
-  }
+  public Map populateImplicitColumns(String filePath,
+ List 
partitionValues,
+ boolean 
includeFileImplicitColumns) {
+Map implicitValues = new LinkedHashMap<>();
 
-  /**
-   * Compares selection root and actual file path to determine partition 
columns values.
-   * Adds implicit file columns according to columns list.
-   *
-   * @return map with columns names as keys and their values
-   */
-  public Map populateImplicitColumns(String filePath, 
String selectionRoot) {
-Map implicitValues = Maps.newLinkedHashMap();
-if (selectionRoot != null) {
-  String[] r = Path.getPathWithoutSchemeAndAuthority(new 
Path(selectionRoot)).toString().split("/");
-  Path path = Path.getPathWithoutSchemeAndAuthority(new 
Path(filePath));
-  String[] p = path.toString().split("/");
-  if (p.length > r.length) {
-String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
-for (int a = 0; a < q.length; a++) {
-  if (isStarQuery || selectedPartitionColumns.contains(a)) {
-implicitValues.put(partitionDesignator + a, q[a]);
-  }
-}
+for(int i = 0; i < partitionValues.size(); i++) {
--- End diff --

`for (`


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-24 Thread vdiravka
Github user vdiravka commented on a diff in the pull request:

https://github.com/apache/drill/pull/1214#discussion_r183251188
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
 ---
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --

indent


---


[GitHub] drill pull request #1214: DRILL-6331: Revisit Hive Drill native parquet impl...

2018-04-17 Thread arina-ielchiieva
GitHub user arina-ielchiieva opened a pull request:

https://github.com/apache/drill/pull/1214

DRILL-6331: Revisit Hive Drill native parquet implementation to be ex…

…posed to Drill optimizations (filter / limit push down, count to direct 
scan)

1. Factored out common logic for Drill parquet reader and Hive Drill native 
parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, 
AbstractParquetScanBatchCreator.
2. Rules that worked previously only with ParquetGroupScan, now can be 
applied for any class that extends AbstractParquetGroupScan: 
DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule.
3. Hive populated partition values based on information returned from Hive 
metastore. Drill populates partition values based on path difference between 
selection root and actual file path.
   Before ColumnExplorer populated partition values based on Drill 
approach. Since now ColumnExplorer populates values for parquet files from Hive 
tables,
   `populateImplicitColumns` method logic was changed to populated 
partition columns only based on given partition values.
4. Refactored ParquetPartitionDescriptor to be responsible for populating 
partition values rather than storing this logic in parquet group scan class.
5. Metadata class was moved to separate metadata package 
(org.apache.drill.exec.store.parquet.metadata). Factored out several inner 
classed to improve code readability.
6. Collected all Drill native parquet reader unit tests into one class 
TestHiveDrillNativeParquetReader, also added new tests to cover new 
functionality.
7. Reduced excessive logging when parquet files metadata is read.
8. Added Drill stopwatch implementation (includes wrapper around Guava 
stopwatch and DummyStopwatch). This would help to save system resources when 
debug level is not enabled.

Link to Jira - 
[DRILL-6331](https://issues.apache.org/jira/browse/DRILL-6331).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arina-ielchiieva/drill DRILL-6331

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1214


commit af5dff61b6b70c4ef70d4a5173aa63f5faa9c2c0
Author: Arina Ielchiieva 
Date:   2018-03-20T18:29:45Z

DRILL-6331: Revisit Hive Drill native parquet implementation to be exposed 
to Drill optimizations (filter / limit push down, count to direct scan)

1. Factored out common logic for Drill parquet reader and Hive Drill native 
parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, 
AbstractParquetScanBatchCreator.
2. Rules that worked previously only with ParquetGroupScan, now can be 
applied for any class that extends AbstractParquetGroupScan: 
DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule.
3. Hive populated partition values based on information returned from Hive 
metastore. Drill populates partition values based on path difference between 
selection root and actual file path.
   Before ColumnExplorer populated partition values based on Drill 
approach. Since now ColumnExplorer populates values for parquet files from Hive 
tables,
   `populateImplicitColumns` method logic was changed to populated 
partition columns only based on given partition values.
4. Refactored ParquetPartitionDescriptor to be responsible for populating 
partition values rather than storing this logic in parquet group scan class.
5. Metadata class was moved to separate metadata package 
(org.apache.drill.exec.store.parquet.metadata). Factored out several inner 
classed to improve code readability.
6. Collected all Drill native parquet reader unit tests into one class 
TestHiveDrillNativeParquetReader, also added new tests to cover new 
functionality.
7. Reduced excessive logging when parquet files metadata is read.
8. Added Drill stopwatch implementation (includes wrapper around Guava 
stopwatch and DummyStopwatch). This would help to save system resources when 
debug level is not enabled.




---