[GitHub] drill issue #540: Fix for DRILL-4759: Drill throwing array index out of boun...

2016-07-13 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/540
  
updated the pull request with test case added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #540: Fix for DRILL-4759: Drill throwing array index out of boun...

2016-07-18 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/540
  
updated the fix. Please review. In DictionaryBigIntReader, we need to call 
parent readField if dictionary encoding is not enabled for the page.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #540: Fix for DRILL-4759: Drill throwing array index out of boun...

2016-07-19 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/540
  
The pull request was merged in commit: e371e18


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #540: Fix for DRILL-4759: Drill throwing array index out ...

2016-07-19 Thread ppadma
Github user ppadma closed the pull request at:

https://github.com/apache/drill/pull/540


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #540: Fix for DRILL-4759: Drill throwing array index out ...

2016-07-05 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/540

Fix for DRILL-4759: Drill throwing array index out of bound exception…

… when reading a parquet file written by map reduce program

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill drill-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/540.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #540


commit 9028f59b805b4ee0c769f414eb8492f17d65258c
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-07-05T18:28:11Z

Fix for DRILL-4759: Drill throwing array index out of bound exception when 
reading a parquet file written by map reduce program




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #540: Fix for DRILL-4759: Drill throwing array index out ...

2016-07-05 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/540#discussion_r69650754
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
 ---
@@ -156,12 +156,18 @@ protected void readField(long 
recordsToReadInThisPass) {
   recordsReadInThisIteration = Math.min(pageReader.currentPageCount
   - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
 
-  for (int i = 0; i < recordsReadInThisIteration; i++){
-try {
-valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readLong());
-} catch ( Exception ex) {
-  throw ex;
+  if (usingDictionary) {
--- End diff --

This should ideally be fixed for all types and will be done in a later 
commit after sufficient testing. For now, just fixing for the type that this 
issue is reported for. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #539: DRILL-4759:Drill throwing array index out of bound ...

2016-07-01 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/539

DRILL-4759:Drill throwing array index out of bound exception when reading a 
parquet file written by map reduce program.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/539.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #539


commit f5c60a88eb656b4eb383ba31a088330f45fc2f80
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-06-28T23:16:45Z

Fix for MD-904

commit 3ba14375d4c64c5b346ef6ce565638804f520eac
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-06-28T23:16:45Z

Update Fix for MD-904

commit 7b837eb37ef0937a5087edd9738712610f79c737
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-06-29T00:22:13Z

Merge branch 'master' of https://github.com/ppadma/drill

commit a3c20a44d8407af6cb4b01243e7bd799ece2f6d7
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-06-30T18:31:38Z

Merge branch 'master' of https://github.com/apache/drill

commit 2c335fdddad1052871fa31843bd70df1e092c14b
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-07-01T00:36:22Z

Fix for MD-904: Drill throwing array index out of bound exception when 
reading a parquet file written by map reduce program.

commit 950d4a8198145b72fabf4a6ea658a83f1201f058
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-07-01T00:52:38Z

MD-904:Drill throwing array index out of bound exception when reading a 
parquet file written by map reduce program

commit 8914c21216f54600b833895c124d222fc058d307
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-07-01T05:51:47Z

Updated fix for MD-904

commit 1d5d95a3a4edf44fb0d2cc08e41b9e9b60be8c75
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-07-01T05:57:38Z

Updated fix for MD-904




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #539: DRILL-4759:Drill throwing array index out of bound ...

2016-07-01 Thread ppadma
Github user ppadma closed the pull request at:

https://github.com/apache/drill/pull/539


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #730: DRILL-5223:Drill should ensure balanced workload as...

2017-01-26 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/730

DRILL-5223:Drill should ensure balanced workload assignment at node l…

…evel in order to get better query performance.

Please see DRILL-5223 for details:
https://issues.apache.org/jira/browse/DRILL-5223


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-5223

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/730.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #730


commit a1ad4113f59e87a4885c271a53afea648bb6f9c3
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Date:   2017-01-21T01:57:10Z

DRILL-5223:Drill should ensure balanced workload assignment at node level 
in order to get better query performance




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #704: DRILL-5125: Provide option to use generic code for ...

2017-01-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/704#discussion_r98299585
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
 ---
@@ -34,4 +38,33 @@ public void testSVRWithNoFilter() throws Exception {
 int numOutputRecords = 
testPhysical(getFile("remover/sv_with_no_filter.json"));
 assertEquals(100, numOutputRecords);
   }
+
+  /**
+   * Test the generic version of the selection vector remover copier
+   * class. The code uses the traditional generated version by default.
+   * This test sets the option to use the generic version, then runs
+   * a query that exercises that version.
+   * 
+   * Note that the tests here exercise only the SV2 version of the
+   * selection remover; no tests exist for the SV4 version.
+   */
+
+  // TODO: Add an SV4 test once the improved mock data generator
+  // is available.
+
+  @Test
+  public void testGenericCopier() throws Exception {
+// TODO: replace this with new setup once revised test framework
+// is available.
+Properties config = new Properties( );
+config.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, 
"false");
+config.put(ExecConstants.HTTP_ENABLE, "false");
+config.put(ExecConstants.REMOVER_ENABLE_GENERIC_COPIER, "true");
+updateTestCluster(1, DrillConfig.create(config));
+
+int numOutputRecords = testPhysical(getFile("remover/test1.json"));
+assertEquals(50, numOutputRecords);
+numOutputRecords = 
testPhysical(getFile("remover/sv_with_no_filter.json"));
+assertEquals(100, numOutputRecords);
+  }
 }
--- End diff --

Do these tests cover all the value vector types ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #704: DRILL-5125: Provide option to use generic code for ...

2017-01-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/704#discussion_r97992610
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Generic selection vector 4 copier implementation that can
+ * be used in place of the generated version. Relies on a
+ * virtual function in each value vector to choose the proper
+ * implementation. Tests suggest that this version performs
+ * better than the generated version for queries with many columns.
+ */
+
+public class GenericSV4Copier extends CopierTemplate4 {
+
+  private ValueVector[] vvOut;
+  private ValueVector[][] vvIn;
+
+  @SuppressWarnings("unused")
+  @Override
+  public void doSetup(FragmentContext context, RecordBatch incoming,
+  RecordBatch outgoing) {
--- End diff --

fix alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles

2017-02-21 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/747#discussion_r102378871
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -178,6 +182,19 @@ public Foreman(final WorkerBee bee, final 
DrillbitContext drillbitContext,
 final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : 
QueryState.STARTING;
 recordNewState(initialState);
 enqueuedQueries.inc();
+
+profileOption = setProfileOption(queryContext.getOptions());
+  }
+
+  private ProfileOption setProfileOption(OptionManager options) {
+if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) 
{
--- End diff --

nit: extra space after !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles

2017-02-21 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/747#discussion_r102378651
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -413,4 +413,15 @@
 
   String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
   BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new 
BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
+
+  /**
+   * Option to save query profiles. If false, no query profile will be 
saved
+   * for any query.
+   */
+  String ENABLE_QUERY_PROFILE_OPTION = "exec.query_profile.enable";
--- End diff --

After reading your comment, it feels like exec.query_profile.save would be 
a better choice. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #751: DRILL-5259: Allow listing a user-defined number of ...

2017-02-17 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/751#discussion_r101862127
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
 ---
@@ -190,10 +193,13 @@ public QProfiles(List runningQueries, 
List finishedQue
 public List getErrors() { return errors; }
   }
 
+  //max Param to cap listing of profiles 
+  private static final String MAX_QPARAM = "max";
+
--- End diff --

please choose a better name than MAX_QPARAM something like 
MAX_QPROFILES_PARAM  ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102830312
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 ---
@@ -33,35 +33,52 @@
   ParquetRecordReader parentReader;
   final List<VarLengthColumn> columns;
   final boolean useAsyncTasks;
+  private final long targetRecordCount;
 
   public VarLenBinaryReader(ParquetRecordReader parentReader, 
List<VarLengthColumn> columns) {
 this.parentReader = parentReader;
 this.columns = columns;
 useAsyncTasks = parentReader.useAsyncColReader;
+
+// Can't read any more records than fixed width fields will fit.
+// Note: this calculation is very likely wrong; it is a simplified
+// version of earlier code, but probably needs even more attention.
+
+int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 
8;
+if (totalFixedFieldWidth == 0) {
+  targetRecordCount = 0;
+} else {
+  targetRecordCount = parentReader.getBatchSize() / 
totalFixedFieldWidth;
+}
   }
 
   /**
* Reads as many variable length values as possible.
*
* @param recordsToReadInThisPass - the number of records recommended 
for reading form the reader
-   * @param firstColumnStatus - a reference to the first column status in 
the parquet file to grab metatdata from
+   * @param firstColumnStatus - a reference to the first column status in 
the Parquet file to grab metatdata from
* @return - the number of fixed length fields that will fit in the batch
* @throws IOException
*/
   public long readFields(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) throws IOException {
 
-long recordsReadInCurrentPass = 0;
-
 // write the first 0 offset
 for (VarLengthColumn columnReader : columns) {
   columnReader.reset();
 }
 Stopwatch timer = Stopwatch.createStarted();
 
-recordsReadInCurrentPass = 
determineSizesSerial(recordsToReadInThisPass);
-if(useAsyncTasks){
+long recordsReadInCurrentPass = 
determineSizesSerial(recordsToReadInThisPass);
+
+// Can't read any more records than fixed width fields will fit.
+
+if (targetRecordCount > 0) {
+  recordsToReadInThisPass = Math.min(recordsToReadInThisPass, 
targetRecordCount);
--- End diff --

I think you mean to update recordsReadInCurrentPass. 
recordsToReadInThisPass is not being used after this. So, what is the point in 
updating ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102827651
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 ---
@@ -70,33 +87,21 @@ public long readFields(long recordsToReadInThisPass, 
ColumnReader firstColumn
 return recordsReadInCurrentPass;
   }
 
-
   private long determineSizesSerial(long recordsToReadInThisPass) throws 
IOException {
-int lengthVarFieldsInCurrentRecord = 0;
-boolean exitLengthDeterminingLoop = false;
-long totalVariableLengthData = 0;
-long recordsReadInCurrentPass = 0;
-do {
+
+int recordsReadInCurrentPass = 0;
+top: do {
   for (VarLengthColumn columnReader : columns) {
-if (!exitLengthDeterminingLoop) {
-  exitLengthDeterminingLoop =
-  columnReader.determineSize(recordsReadInCurrentPass, 
lengthVarFieldsInCurrentRecord);
-} else {
-  break;
+// Return status is "done reading", meaning stop if true.
+if (columnReader.determineSize(recordsReadInCurrentPass, 0 /* 
unused */ )) {
--- End diff --

why not remove the unused parameter ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102826142
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 ---
@@ -33,35 +33,52 @@
   ParquetRecordReader parentReader;
   final List<VarLengthColumn> columns;
   final boolean useAsyncTasks;
+  private final long targetRecordCount;
 
   public VarLenBinaryReader(ParquetRecordReader parentReader, 
List<VarLengthColumn> columns) {
 this.parentReader = parentReader;
 this.columns = columns;
 useAsyncTasks = parentReader.useAsyncColReader;
+
+// Can't read any more records than fixed width fields will fit.
+// Note: this calculation is very likely wrong; it is a simplified
+// version of earlier code, but probably needs even more attention.
+
+int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 
8;
+if (totalFixedFieldWidth == 0) {
+  targetRecordCount = 0;
+} else {
+  targetRecordCount = parentReader.getBatchSize() / 
totalFixedFieldWidth;
+}
   }
 
   /**
* Reads as many variable length values as possible.
*
* @param recordsToReadInThisPass - the number of records recommended 
for reading form the reader
-   * @param firstColumnStatus - a reference to the first column status in 
the parquet file to grab metatdata from
+   * @param firstColumnStatus - a reference to the first column status in 
the Parquet file to grab metatdata from
* @return - the number of fixed length fields that will fit in the batch
* @throws IOException
*/
   public long readFields(long recordsToReadInThisPass, ColumnReader 
firstColumnStatus) throws IOException {
--- End diff --

Since we are not using firstColumnStatus, can we just remove that ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102824318
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -351,7 +353,7 @@ public void setup(OperatorContext operatorContext, 
OutputMutator output) throws
 
 MaterializedField field;
 //ParquetMetadataConverter metaConverter = new 
ParquetMetadataConverter();
-FileMetaData fileMetaData;
+//FileMetaData fileMetaData;
--- End diff --

Instead of commenting out, just remove this and earlier line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #757: DRILL-5290: Provide an option to build operator table once...

2017-02-23 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/757
  
Thanks Sudheesh, Paul and Arina for the review.  Original thought was it is 
not worth the effort to keep operator table in sync with changes in dynamic 
UDFs. This option is meant to be used for short running operational queries, 
which most likely will not need dynamic UDF support (well, at least for now). 
For long running analytic queries, this is not considered an issue. 
However, I am looking into implementing Sudheesh's suggestion. I will 
provide an update soon. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...

2017-02-22 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/757

DRILL-5290: Provide an option to build operator table once for built-…

…in static functions and reuse it across queries.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-5290

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #757


commit 67ae503b0b820cd9f40ae3ef8d703e9135f90a74
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Date:   2017-02-22T18:31:01Z

DRILL-5290: Provide an option to build operator table once for built-in 
static functions and reuse it across queries.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #758: DRILL-5287: Provide option to skip updates of ephem...

2017-02-24 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/758#discussion_r103005063
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -1010,7 +1010,9 @@ public void addToEventQueue(final QueryState 
newState, final Exception exception
 
   private void recordNewState(final QueryState newState) {
 state = newState;
-queryManager.updateEphemeralState(newState);
+if 
(queryContext.getOptions().getOption(ExecConstants.ZK_QUERY_STATE_UPDATE)) {
+  queryManager.updateEphemeralState(newState);
+}
--- End diff --

For long running queries, it may not make much difference. It adds latency 
of around ~50-60 msec for single query. However, with high concurrency, impact 
of contention because of zookeeper updates is significant. Like I mentioned in 
the JIRA, for concurrency=100, the average query response time for simple 
queries is 8 sec vs 0.2 sec with these updates disabled.  It does not impact 
the query profile. Query profile gets updated and written at the end of the 
query as usual.  This option affects only running queries. In Web UI, you will 
not see running queries and their state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102725896
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -376,14 +378,14 @@ public void setup(OperatorContext operatorContext, 
OutputMutator output) throws
   if (dataTypeLength == -1) {
   allFieldsFixedLength = false;
 } else {
-bitWidthAllFixedFields += dataTypeLength;
+  bitWidthAllFixedFields += dataTypeLength;
 }
   }
 //rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
 
 if (columnsToScan != 0  && allFieldsFixedLength) {
   recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
-  footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
65535);
+  footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH);
--- End diff --

This is DEFAULT_RECORDS_TO_READ_FIXED_WIDTH (not VARIABLE)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102726321
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -73,6 +72,7 @@
   private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * 
NUMBER_OF_VECTORS; // 256kb
   private static final long DEFAULT_BATCH_LENGTH_IN_BITS = 
DEFAULT_BATCH_LENGTH * 8; // 256kb
   private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 
32*1024;
--- End diff --

while we are at it, I would probably rename 
DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH to 
DEFAULT_RECORDS_TO_READ_VARIABLE_WIDTH


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...

2017-02-22 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/757#discussion_r102611281
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -413,4 +413,8 @@
 
   String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
   BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new 
BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
+
+  String USE_DYNAMIC_UDFS = "exec.udf.use_dynamic";
--- End diff --

I did not use the existing option "exec.udf.enable_dynamic_support" because 
if that option is enabled and then disabled later, expectation is that dynamic 
UDFs added in that window continue to be available and working even after 
disabling.  

All other comments addressed. Please review new diffs.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches

2017-02-23 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/749#discussion_r102726705
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 ---
@@ -70,33 +70,31 @@ public long readFields(long recordsToReadInThisPass, 
ColumnReader firstColumn
 return recordsReadInCurrentPass;
   }
 
-
   private long determineSizesSerial(long recordsToReadInThisPass) throws 
IOException {
-int lengthVarFieldsInCurrentRecord = 0;
-boolean exitLengthDeterminingLoop = false;
-long totalVariableLengthData = 0;
-long recordsReadInCurrentPass = 0;
-do {
+
+// Can't read any more records than fixed width fields will fit.
+// Note: this calculation is very likely wrong; it is a simplified
+// version of earlier code, but probably needs even more attention.
+
+int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 
8;
+long batchSize = parentReader.getBatchSize();
+if (totalFixedFieldWidth > 0) {
+  recordsToReadInThisPass = Math.min(recordsToReadInThisPass, 
batchSize / totalFixedFieldWidth);
--- End diff --

Instead of fixing it up here, please do it outside the function and pass 
the correct value for recordsToReadInThisPass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles

2017-02-21 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/747#discussion_r102277543
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -413,4 +413,19 @@
 
   String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
   BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new 
BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
+
+  /**
+   * Option to save query profiles.
+   * 
+   * async (default): Write query profile after last response
+   * to the client.
+   * sync: Write the query profile before the last response to
+   * the client. Very useful for testing to avoid race conditions.
+   * none: Don't write the query profile at all. Useful when running
+   * many production jobs that do not need to be reviewed.
+   * 
+   */
+  String QUERY_PROFILE_OPTION = "exec.profile";
--- End diff --

exec.query_profile.write_mode might be a better choice for clarity. 
Also, would it be better to have two options ?
boolean: exec.query_profile.write_enabled and exec.query_profile.write_mode 
(sync or async). 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles

2017-02-21 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/747#discussion_r102278999
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -853,7 +875,9 @@ public void close() throws Exception {
   // storage write; query completion occurs in parallel with profile
   // persistence.
 
-  queryManager.writeFinalProfile(uex);
+  if (profileOption == ProfileOption.ASYNC) {
+writeProfile(uex);
--- End diff --

why not call queryManager.writeFinalProfile directly from here and avoid 
creating another function ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...

2017-02-22 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/757#discussion_r102615118
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -413,4 +413,8 @@
 
   String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
   BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new 
BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
+
+  String USE_DYNAMIC_UDFS = "exec.udf.use_dynamic";
--- End diff --

Currently, we have one FunctionRegistryHolder (LocalFunctionRegistry) for 
both static and dynamic functions, from which we register for each query.  It 
gets updated when we download new jars/functions from zookeeper and will not be 
same as what it is during startup if dynamic UDF support is enabled and 
disabled.  That means if I use table I built during startup, it will miss the 
dynamic UDFs that got added in between and to include them, you have to rebuild 
the table.  For that reason, I choose to  add a new option. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #758: DRILL-5287: Provide option to skip updates of ephem...

2017-02-28 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/758#discussion_r103487885
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
 ---
@@ -280,8 +281,15 @@ public void interrupted(final InterruptedException ex) 
{
 }
   }
 
-  QueryState updateEphemeralState(final QueryState queryState) {
-switch (queryState) {
+  void updateEphemeralState(final QueryState queryState) {
+  // If query is already in zk transient store, ignore the transient 
state update option.
+  // Else, they will not be removed from transient store upon 
completion.
+  if (transientProfiles.get(stringQueryId) == null &&
--- End diff --

I want to bypass the option for the queries which are already in transient 
store when option is enabled. Otherwise, their state will never get updated 
and/or will never be removed from transient store. web UI will show these 
queries as running forever :-)

Thanks for raising a good point regarding using transientProfiles.get. I 
made the change to update and use in memory state instead. 

Please review the new diffs.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #766: DRILL-5304: Queries fail intermittently when there ...

2017-02-28 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/766

DRILL-5304: Queries fail intermittently when there is skew in data di…

…stribution

Change the assignment logic so we first make sure we assign up to minCount 
for all nodes before going up to maxCount per node. 
Also, fixed a small issue in parallelization code where we are rounding 
down the calculation of number of fragments to run on nodes with affinity, 
because of which, sometimes, we schedule less fragments on nodes with affinity 
vs. nodes without affinity.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-5304

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/766.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #766


commit 49cf9f0b54d8c0ea15c3d6a59f99b8e23870104e
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Date:   2017-02-28T02:32:24Z

DRILL-5304: Queries fail intermittently when there is skew in data 
distribution




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...

2017-02-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/757#discussion_r103285068
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
@@ -413,4 +413,8 @@
 
   String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support";
   BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new 
BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true);
+
+  String USE_DYNAMIC_UDFS = "exec.udf.use_dynamic";
--- End diff --

ok,  we need to use readWriteLocks if we update the table each time a 
function gets added/removed. That is unnecessary overhead and will cause 
contention with concurrency. One option is to split the table into two, one for 
built-in functions (which can be accessed without locks) and other for dynamic 
functions.  That will be a bigger change and like I mentioned before, is not 
considered worth the effort. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-26 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/597

DRILL-4905: Push down the LIMIT to the parquet reader scan.

For limit N query, where N is less than current default record batchSize 
(256K for all fixedlength, 32K otherwise), we still end up reading all 256K/32K 
rows from disk if rowGroup has that many rows. This  causes performance 
degradation especially when there are large number of columns. 
This fix tries to address this problem by changing the record batchSize 
parquet record reader uses so we don't read more than what is needed.
Also, added a sys option (store.parquet.record_batch_size) to be able to 
set record batch size.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-4905

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/597.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #597


commit cd665ebdba11f8685ba446f5ec535c81ddd6edc7
Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local>
Date:   2016-09-26T17:51:07Z

DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the 
numbers of records read




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r81244611
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -864,6 +872,14 @@ public String getDigest() {
 return toString();
   }
 
+  public void setCacheFileRoot(String cacheFileRoot) {
+this.cacheFileRoot = cacheFileRoot;
+  }
+
+  public void setBatchSize(long batchSize) {
+this.recommendedBatchSize = batchSize;
--- End diff --

checking against current value is fine for now.  Adding new methods will be 
a bit outside the scope of this JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-28 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80990803
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -115,6 +115,8 @@
   private List rowGroupInfos;
   private Metadata.ParquetTableMetadataBase parquetTableMetadata = null;
   private String cacheFileRoot = null;
+  private int batchSize;
+  private static final int DEFAULT_BATCH_LENGTH = 256 * 1024;
--- End diff --

Done. Please review new diffs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80817027
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -115,6 +115,8 @@
   private List rowGroupInfos;
   private Metadata.ParquetTableMetadataBase parquetTableMetadata = null;
   private String cacheFileRoot = null;
+  private int batchSize;
+  private static final int DEFAULT_BATCH_LENGTH = 256 * 1024;
--- End diff --

Max value for store.parquet.record_batch_size is 256K. So, it cannot be set 
to 512K. I changed the name in ParquetGroupScan/ParquetRowGroupScan to 
recommendedBatchSize as we discussed. Please review new diffs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80754060
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -899,6 +907,16 @@ public FileGroupScan clone(FileSelection selection) 
throws IOException {
 return newScan;
   }
 
+  // clone to create new groupscan with new file selection and batchSize.
+  public ParquetGroupScan clone(FileSelection selection, int batchSize) 
throws IOException {
+ParquetGroupScan newScan = new ParquetGroupScan(this);
+newScan.modifyFileSelection(selection);
+newScan.cacheFileRoot = selection.cacheFileRoot;
--- End diff --

yes, we can.  will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80751210
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 ---
@@ -107,7 +107,7 @@ public ScanBatch getBatch(FragmentContext context, 
ParquetRowGroupScan rowGroupS
 if 
(!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val
 && !isComplex(footers.get(e.getPath( {
   readers.add(
   new ParquetRecordReader(
-  context, e.getPath(), e.getRowGroupIndex(), fs,
+  context, rowGroupScan.getBatchSize(), e.getPath(), 
e.getRowGroupIndex(), fs,
--- End diff --

This fix is done only for native parquet reader. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #597: DRILL-4905: Push down the LIMIT to the parquet reader scan...

2016-09-27 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/597
  
updated with review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80788614
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -899,6 +907,16 @@ public FileGroupScan clone(FileSelection selection) 
throws IOException {
 return newScan;
   }
 
+  // clone to create new groupscan with new file selection and batchSize.
+  public ParquetGroupScan clone(FileSelection selection, int batchSize) 
throws IOException {
+ParquetGroupScan newScan = new ParquetGroupScan(this);
+newScan.modifyFileSelection(selection);
+newScan.cacheFileRoot = selection.cacheFileRoot;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80788617
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -899,6 +907,16 @@ public FileGroupScan clone(FileSelection selection) 
throws IOException {
 return newScan;
   }
 
+  // clone to create new groupscan with new file selection and batchSize.
+  public ParquetGroupScan clone(FileSelection selection, int batchSize) 
throws IOException {
+ParquetGroupScan newScan = new ParquetGroupScan(this);
+newScan.modifyFileSelection(selection);
+newScan.cacheFileRoot = selection.cacheFileRoot;
+newScan.init(selection.getMetaContext());
+newScan.batchSize = batchSize;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-27 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r80788478
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 ---
@@ -107,7 +107,7 @@ public ScanBatch getBatch(FragmentContext context, 
ParquetRowGroupScan rowGroupS
 if 
(!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val
 && !isComplex(footers.get(e.getPath( {
   readers.add(
   new ParquetRecordReader(
-  context, e.getPath(), e.getRowGroupIndex(), fs,
+  context, rowGroupScan.getBatchSize(), e.getPath(), 
e.getRowGroupIndex(), fs,
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...

2016-09-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r81238087
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 ---
@@ -864,6 +872,14 @@ public String getDigest() {
 return toString();
   }
 
+  public void setCacheFileRoot(String cacheFileRoot) {
+this.cacheFileRoot = cacheFileRoot;
+  }
+
+  public void setBatchSize(long batchSize) {
+this.recommendedBatchSize = batchSize;
--- End diff --

ok. Added the assert. Check updated diffs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #597: DRILL-4905: Push down the LIMIT to the parquet reader scan...

2016-09-29 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/597
  
updated with all review comments taken care of. Please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84952909
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
 ---
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * BufferedDirectBufInputStream  reads from the
+ * underlying InputStream in blocks of data, into an
+ * internal buffer. The internal buffer is a direct memory backed
+ * buffer. The implementation is similar to the 
BufferedInputStream
+ * class except that the internal buffer is a Drillbuf and
+ * not a byte array. The mark and reset methods of the underlying
+ * InputStreamare not supported.
+ */
+public class BufferedDirectBufInputStream extends DirectBufInputStream 
implements Closeable {
+
+  private static final org.slf4j.Logger logger =
+  
org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class);
+
+  private static int defaultBufferSize = 8192 * 1024; // 8 MiB
--- End diff --

By KiB and MiB, I think you mean KibiByte and MebiByte, which are not 
commonly used terms. Had to google search :-)  If you want to stick with them, 
may be add a comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85028094
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 ---
@@ -41,43 +51,149 @@ public VarLenBinaryReader(ParquetRecordReader 
parentReader, List 
firstColumnStatus) throws IOException {
 
 long recordsReadInCurrentPass = 0;
-int lengthVarFieldsInCurrentRecord;
-long totalVariableLengthData = 0;
-boolean exitLengthDeterminingLoop = false;
+
 // write the first 0 offset
 for (VarLengthColumn columnReader : columns) {
   columnReader.reset();
 }
 
+recordsReadInCurrentPass = 
determineSizesSerial(recordsToReadInThisPass);
+if(useAsyncTasks){
+  readRecordsParallel(recordsReadInCurrentPass);
+}else{
+  readRecordsSerial(recordsReadInCurrentPass);
+}
+return recordsReadInCurrentPass;
+  }
+
+
+  private long determineSizesSerial(long recordsToReadInThisPass) throws 
IOException {
+int lengthVarFieldsInCurrentRecord = 0;
+boolean exitLengthDeterminingLoop = false;
+long totalVariableLengthData = 0;
+long recordsReadInCurrentPass = 0;
 do {
-  lengthVarFieldsInCurrentRecord = 0;
   for (VarLengthColumn columnReader : columns) {
-if ( !exitLengthDeterminingLoop ) {
-  exitLengthDeterminingLoop = 
columnReader.determineSize(recordsReadInCurrentPass, 
lengthVarFieldsInCurrentRecord);
+if (!exitLengthDeterminingLoop) {
+  exitLengthDeterminingLoop =
+  columnReader.determineSize(recordsReadInCurrentPass, 
lengthVarFieldsInCurrentRecord);
 } else {
   break;
 }
   }
   // check that the next record will fit in the batch
-  if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * 
parentReader.getBitWidthAllFixedFields() + totalVariableLengthData
-  + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) {
+  if (exitLengthDeterminingLoop ||
+  (recordsReadInCurrentPass + 1) * 
parentReader.getBitWidthAllFixedFields()
+  + totalVariableLengthData + lengthVarFieldsInCurrentRecord > 
parentReader.getBatchSize()) {
 break;
   }
-  for (VarLengthColumn columnReader : columns ) {
+  for (VarLengthColumn columnReader : columns) {
 columnReader.updateReadyToReadPosition();
 columnReader.currDefLevel = -1;
   }
   recordsReadInCurrentPass++;
   totalVariableLengthData += lengthVarFieldsInCurrentRecord;
 } while (recordsReadInCurrentPass < recordsToReadInThisPass);
 
+return recordsReadInCurrentPass;
+  }
+
+
+  public long determineSizesParallel(long recordsToReadInThisPass ) throws 
IOException {
+boolean doneReading = false;
+int lengthVarFieldsInCurrentRecord = 0;
+boolean exitLengthDeterminingLoop = false;
+long totalVariableLengthData = 0;
+long recordsReadInCurrentPass = 0;
+
+do {
+doneReading = readPagesParallel();
+
+if (!doneReading) {
+  lengthVarFieldsInCurrentRecord = 0;
+  for (VarLengthColumn columnReader : columns) {
+doneReading = columnReader.processPageData((int) 
recordsReadInCurrentPass);
+if(doneReading) {
+  break;
+}
+lengthVarFieldsInCurrentRecord += 
columnReader.dataTypeLengthInBits;
+doneReading = columnReader.checkVectorCapacityReached();
+if(doneReading) {
+  break;
+}
+  }
+}
+
+exitLengthDeterminingLoop = doneReading;
+
+  // check that the next record will fit in the batch
+  if (exitLengthDeterminingLoop ||
--- End diff --

Alignment seem to be off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84972962
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85136996
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java 
---
@@ -174,4 +191,11 @@ public DrillFileSystem newFileSystem(Configuration 
conf) throws IOException {
 return fs;
   }
 
+  @Override
+  public DrillFileSystem newNonTrackingFileSystem(Configuration conf) 
throws IOException {
+Preconditions.checkState(fs == null, "Tried to create a second 
FileSystem. Can only be called once per OperatorContext");
+fs = new DrillFileSystem(conf, null);
+return fs;
+  }
+
--- End diff --

Extra lines here and bunch of other places in the code. Please check and 
fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84972161
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
--- End diff --

Is it valid to assume dictionaryPageOffset is always greater than current 
position  or should we add a assert/check here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85141426
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84981300
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84957411
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
 ---
@@ -99,26 +99,47 @@
 
   int currentPageCount = -1;
 
-  private FSDataInputStream inputStream;
+  protected FSDataInputStream inputStream;
 
   // These need to be held throughout reading of the entire column chunk
   List allocatedDictionaryBuffers;
 
-  private final CodecFactory codecFactory;
+  protected final CodecFactory codecFactory;
+  protected final String fileName;
 
-  private final ParquetReaderStats stats;
+  protected final ParquetReaderStats stats;
+  private final boolean useBufferedReader;
+  private final int scanBufferSize;
+  private final boolean useFadvise;
 
-  PageReader(ColumnReader parentStatus, FileSystem fs, Path path, 
ColumnChunkMetaData columnChunkMetaData)
-throws ExecutionSetupException{
+  
PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader 
parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
--- End diff --

why not import this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84970035
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
 ---
@@ -148,7 +185,8 @@ public boolean determineSize(long 
recordsReadInCurrentPass, Integer lengthVarFie
   return true;
 }
 
-lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+// Never used in this code path. Hard to remove because the method is 
overidden by subclasses
+lengthVarFieldsInCurrentRecord = -1;
--- End diff --

why make this -1 even if not used ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84968348
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84969218
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84970233
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java 
---
@@ -174,4 +191,11 @@ public DrillFileSystem newFileSystem(Configuration 
conf) throws IOException {
 return fs;
   }
 
+  @Override
+  public DrillFileSystem newNonTrackingFileSystem(Configuration conf) 
throws IOException {
--- End diff --

Please add a comment  what this nonTrackingFileSystem is for ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85152503
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
 ---
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * BufferedDirectBufInputStream  reads from the
+ * underlying InputStream in blocks of data, into an
+ * internal buffer. The internal buffer is a direct memory backed
+ * buffer. The implementation is similar to the 
BufferedInputStream
+ * class except that the internal buffer is a Drillbuf and
+ * not a byte array. The mark and reset methods of the underlying
+ * InputStreamare not supported.
+ */
+public class BufferedDirectBufInputStream extends DirectBufInputStream 
implements Closeable {
+
+  private static final org.slf4j.Logger logger =
+  
org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class);
+
+  private static int defaultBufferSize = 8192 * 1024; // 8 MiB
+  private static int defaultTempBufferSize = 8192; // 8 KiB
+  private static int smallBufferSize = 64 * 1024; // 64 KiB
+
+  /**
+   * The internal buffer to keep data read from the underlying inputStream.
+   * internalBuffer[0]  through internalBuffer[count-1] 

+   * contains data read from the underlying  input stream.
+   */
+  protected volatile DrillBuf internalBuffer; // the internal buffer
+
+  /**
+   * The number of valid bytes in internalBuffer.
+   *  count  is always in the range 
[0,internalBuffer.capacity]
+   * internalBuffer[count-1] is the last valid byte in the 
buffer.
+   */
+  protected int count;
+
+  /**
+   * The current read position in the buffer; the index of the next
+   * character to be read from the internalBuffer array.
+   * 
+   * This value is always in the range [0,count].
+   * If curPosInBuffer is equal to count> then 
we have read
+   * all the buffered data and the next read (or skip) will require more 
data to be read
+   * from the underlying input stream.
+   */
+  protected int curPosInBuffer;
+
+  protected long curPosInStream; // current offset in the input stream
+
+  private int bufSize;
+
+  private volatile DrillBuf tempBuffer; // a temp Buffer for use by 
read(byte[] buf, int off, int len)
+
+  private DrillBuf getBuf() throws IOException {
+checkInputStreamState();
+if (internalBuffer == null) {
+  throw new IOException("Input stream is closed.");
+}
+return this.internalBuffer;
+  }
+
+  /**
+   * Creates a BufferedDirectBufInputStream
+   * with the default (8 MiB) buffer size.
+   */
+  public BufferedDirectBufInputStream(InputStream in, BufferAllocator 
allocator, String id,
+  long startOffset, long totalByteSize, boolean enableHints) {
+this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, 
enableHints);
+  }
 

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84971152
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85153296
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
 ---
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+public class DirectBufInputStream extends FilterInputStream {
+
+  private static final org.slf4j.Logger logger =
+  org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class);
+
+  protected boolean enableHints = true;
+  protected String streamId; // a name for logging purposes only
+  protected BufferAllocator allocator;
+  /**
+   * The length of the data we expect to read. The caller may, in fact,
+   * ask for more or less bytes. However this is useful for providing 
hints where
+   * the underlying InputStream supports hints (e.g. fadvise)
+   */
+  protected final long totalByteSize;
+
+  /**
+   * The offset in the underlying stream to start reading from
+   */
+  protected final long startOffset;
+
+  public DirectBufInputStream(InputStream in, BufferAllocator allocator, 
String id, long startOffset,
+  long totalByteSize, boolean enableHints) {
+super(in);
+Preconditions.checkArgument(startOffset >= 0);
+Preconditions.checkArgument(totalByteSize >= 0);
+this.streamId = id;
+this.allocator = allocator;
+this.startOffset = startOffset;
+this.totalByteSize = totalByteSize;
+this.enableHints = enableHints;
+  }
+
+  public void init() throws IOException, UnsupportedOperationException {
+checkStreamSupportsByteBuffer();
+if (enableHints) {
+  fadviseIfAvailable(getInputStream(), this.startOffset, 
this.totalByteSize);
+}
+getInputStream().seek(this.startOffset);
+return;
+  }
+
+  public int read() throws IOException {
+return getInputStream().read();
+  }
+
+  public synchronized int read(DrillBuf buf, int off, int len) throws 
IOException {
+buf.clear();
+ByteBuffer directBuffer = buf.nioBuffer(0, len);
+int lengthLeftToRead = len;
+while (lengthLeftToRead > 0) {
+  lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), 
directBuffer, lengthLeftToRead);
+}
+buf.writerIndex(len);
+return len;
+  }
+
+  public synchronized DrillBuf getNext(int bytes) throws IOException {
+DrillBuf b = allocator.buffer(bytes);
+int bytesRead = -1;
+try {
+bytesRead = read(b, 0, bytes);
+} catch (IOException e){
+  b.release();
+  throw e;
+}
+if (bytesRead <= -1) {
--- End diff --

bytesRead = 0 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84990242
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
+  } catch (IOException e) {
+handleAndThrowException(e, "Error Reading dictionary page.");
+  }
+  // parent constructor may call this method before the thread pool is 
set.
+  if (threadPool == null) {
+threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+  }
+  asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  readDictionaryPage(asyncPageRead, parentStatus);
+  asyncPageRead = null; // reset after consuming
+}
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+DrillBuf data;
+boolean isDictionary = false;
+synchronized (this) {
+  data = readStatus.getPageData();
+  readStatus.setPageData(null);
+  isDictionary = readStatus.isDictionaryPage;
+}
+if (parentColumnReader.columnChunkMetaData.getCodec() != 
CompressionCodecName.UNCOMPRESSED) {
+  DrillBuf uncompressedData = data;
+  data = decompress(readStatus.getPageHeader(), uncompressedData);
+  synchronized (this) {
+readStatus.setPageData(null);
+  }
+  uncompress

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84953931
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
--- End diff --

formatting issue. Please fix here and other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85146088
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future asyncPageRead;
+
+  AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path,
+  ColumnChunkMetaData columnChunkMetaData) throws 
ExecutionSetupException {
+super(parentStatus, fs, path, columnChunkMetaData);
+if (threadPool == null) {
+  threadPool = 
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+}
+asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader 
parentStatus,
+  final ColumnChunkMetaData columnChunkMetaData, final 
DirectBufInputStream f) throws UserException {
+if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+  try {
+dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - 
dataReader.getPos());
--- End diff --

 I see that input argument f is not used. should we be using f instead of 
dataReader here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85150281
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
 ---
@@ -234,4 +286,48 @@ public static int readIntLittleEndian(DrillBuf in, int 
offset) {
 return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
   }
 
+  private class ColumnReaderProcessPagesTask implements Callable {
+
+private final ColumnReader parent = ColumnReader.this;
+private final long recordsToReadInThisPass;
+
+public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){
+  this.recordsToReadInThisPass = recordsToReadInThisPass;
+}
+
+@Override public Long call() throws IOException{
--- End diff --

Space between IOException and {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84987279
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java 
---
@@ -45,12 +46,15 @@
   private final BufferAllocator allocator;
   private final ScanResult classpathScan;
   private final ExecutorService executor;
+  private final ExecutorService scanExecutor;
+  private final ExecutorService scanDecodeExecutor;
 
   public BootStrapContext(DrillConfig config, ScanResult classpathScan) {
 this.config = config;
 this.classpathScan = classpathScan;
 this.loop = 
TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
 "BitServer-");
-this.loop2 = 
TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
 "BitClient-");
+this.loop2 = 
TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
+"BitClient-");
--- End diff --

formatting change - not needed ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85152665
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
 ---
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * BufferedDirectBufInputStream  reads from the
+ * underlying InputStream in blocks of data, into an
+ * internal buffer. The internal buffer is a direct memory backed
+ * buffer. The implementation is similar to the 
BufferedInputStream
+ * class except that the internal buffer is a Drillbuf and
+ * not a byte array. The mark and reset methods of the underlying
+ * InputStreamare not supported.
+ */
+public class BufferedDirectBufInputStream extends DirectBufInputStream 
implements Closeable {
+
+  private static final org.slf4j.Logger logger =
+  
org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class);
+
+  private static int defaultBufferSize = 8192 * 1024; // 8 MiB
+  private static int defaultTempBufferSize = 8192; // 8 KiB
+  private static int smallBufferSize = 64 * 1024; // 64 KiB
+
+  /**
+   * The internal buffer to keep data read from the underlying inputStream.
+   * internalBuffer[0]  through internalBuffer[count-1] 

+   * contains data read from the underlying  input stream.
+   */
+  protected volatile DrillBuf internalBuffer; // the internal buffer
+
+  /**
+   * The number of valid bytes in internalBuffer.
+   *  count  is always in the range 
[0,internalBuffer.capacity]
+   * internalBuffer[count-1] is the last valid byte in the 
buffer.
+   */
+  protected int count;
+
+  /**
+   * The current read position in the buffer; the index of the next
+   * character to be read from the internalBuffer array.
+   * 
+   * This value is always in the range [0,count].
+   * If curPosInBuffer is equal to count> then 
we have read
+   * all the buffered data and the next read (or skip) will require more 
data to be read
+   * from the underlying input stream.
+   */
+  protected int curPosInBuffer;
+
+  protected long curPosInStream; // current offset in the input stream
+
+  private int bufSize;
+
+  private volatile DrillBuf tempBuffer; // a temp Buffer for use by 
read(byte[] buf, int off, int len)
+
+  private DrillBuf getBuf() throws IOException {
+checkInputStreamState();
+if (internalBuffer == null) {
+  throw new IOException("Input stream is closed.");
+}
+return this.internalBuffer;
+  }
+
+  /**
+   * Creates a BufferedDirectBufInputStream
+   * with the default (8 MiB) buffer size.
+   */
+  public BufferedDirectBufInputStream(InputStream in, BufferAllocator 
allocator, String id,
+  long startOffset, long totalByteSize, boolean enableHints) {
+this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, 
enableHints);
+  }
 

[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r84982984
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---
@@ -43,10 +45,18 @@
 
   public abstract OperatorStats getStats();
 
+  public abstract ExecutorService getExecutor();
+
+  public abstract ExecutorService getScanExecutor();
--- End diff --

Is it possible to use enum and pass that to getExecutor instead of adding 
specific functions like getScanExecutor, getScanDecodeExecutor etc. ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85153076
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
 ---
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+public class DirectBufInputStream extends FilterInputStream {
+
+  private static final org.slf4j.Logger logger =
+  org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class);
+
+  protected boolean enableHints = true;
+  protected String streamId; // a name for logging purposes only
+  protected BufferAllocator allocator;
+  /**
+   * The length of the data we expect to read. The caller may, in fact,
+   * ask for more or less bytes. However this is useful for providing 
hints where
+   * the underlying InputStream supports hints (e.g. fadvise)
+   */
+  protected final long totalByteSize;
+
+  /**
+   * The offset in the underlying stream to start reading from
+   */
+  protected final long startOffset;
+
+  public DirectBufInputStream(InputStream in, BufferAllocator allocator, 
String id, long startOffset,
+  long totalByteSize, boolean enableHints) {
+super(in);
+Preconditions.checkArgument(startOffset >= 0);
+Preconditions.checkArgument(totalByteSize >= 0);
+this.streamId = id;
+this.allocator = allocator;
+this.startOffset = startOffset;
+this.totalByteSize = totalByteSize;
+this.enableHints = enableHints;
+  }
+
+  public void init() throws IOException, UnsupportedOperationException {
+checkStreamSupportsByteBuffer();
+if (enableHints) {
+  fadviseIfAvailable(getInputStream(), this.startOffset, 
this.totalByteSize);
+}
+getInputStream().seek(this.startOffset);
+return;
+  }
+
+  public int read() throws IOException {
+return getInputStream().read();
+  }
+
+  public synchronized int read(DrillBuf buf, int off, int len) throws 
IOException {
+buf.clear();
+ByteBuffer directBuffer = buf.nioBuffer(0, len);
+int lengthLeftToRead = len;
+while (lengthLeftToRead > 0) {
+  lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), 
directBuffer, lengthLeftToRead);
+}
+buf.writerIndex(len);
+return len;
+  }
+
+  public synchronized DrillBuf getNext(int bytes) throws IOException {
+DrillBuf b = allocator.buffer(bytes);
+int bytesRead = -1;
+try {
+bytesRead = read(b, 0, bytes);
+} catch (IOException e){
--- End diff --

space )  {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance

2016-10-26 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/611#discussion_r85153504
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
 ---
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+public class DirectBufInputStream extends FilterInputStream {
+
+  private static final org.slf4j.Logger logger =
+  org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class);
+
+  protected boolean enableHints = true;
+  protected String streamId; // a name for logging purposes only
+  protected BufferAllocator allocator;
+  /**
+   * The length of the data we expect to read. The caller may, in fact,
+   * ask for more or less bytes. However this is useful for providing 
hints where
+   * the underlying InputStream supports hints (e.g. fadvise)
+   */
+  protected final long totalByteSize;
+
+  /**
+   * The offset in the underlying stream to start reading from
+   */
+  protected final long startOffset;
+
+  public DirectBufInputStream(InputStream in, BufferAllocator allocator, 
String id, long startOffset,
+  long totalByteSize, boolean enableHints) {
+super(in);
+Preconditions.checkArgument(startOffset >= 0);
+Preconditions.checkArgument(totalByteSize >= 0);
+this.streamId = id;
+this.allocator = allocator;
+this.startOffset = startOffset;
+this.totalByteSize = totalByteSize;
+this.enableHints = enableHints;
+  }
+
+  public void init() throws IOException, UnsupportedOperationException {
+checkStreamSupportsByteBuffer();
+if (enableHints) {
+  fadviseIfAvailable(getInputStream(), this.startOffset, 
this.totalByteSize);
+}
+getInputStream().seek(this.startOffset);
+return;
+  }
+
+  public int read() throws IOException {
+return getInputStream().read();
+  }
+
+  public synchronized int read(DrillBuf buf, int off, int len) throws 
IOException {
+buf.clear();
+ByteBuffer directBuffer = buf.nioBuffer(0, len);
+int lengthLeftToRead = len;
+while (lengthLeftToRead > 0) {
+  lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), 
directBuffer, lengthLeftToRead);
+}
+buf.writerIndex(len);
+return len;
+  }
+
+  public synchronized DrillBuf getNext(int bytes) throws IOException {
+DrillBuf b = allocator.buffer(bytes);
+int bytesRead = -1;
+try {
+bytesRead = read(b, 0, bytes);
+} catch (IOException e){
+  b.release();
+  throw e;
+}
+if (bytesRead <= -1) {
+  b.release();
+  return null;
+}
+return b;
+  }
+
+  public long getPos() throws IOException {
+return getInputStream().getPos();
+  }
+
+  public boolean hasRemainder() throws IOException {
+// We use the following instead of "getInputStream.available() > 0" 
because
+// available() on HDFS seems to have issues with file sizes
+// that are greater than Integer.MAX_VALUE
+return (this.getPos() < (this.startOffset + this.totalByteSize));
+  }
+
+  protected FSDataInputStream getInputStream() throws IOException {
+// Make sure stream is open
 

[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...

2016-11-09 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/639
  
Merged with latest code. All review comments taken care of. All tests pass 
with the option `store.parquet.use_local_affinity` = true and false, both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...

2016-11-04 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/639
  
Parallelization logic is affected for following reasons:
Depending upon how many rowGroups to scan on a node (based on locality 
information) i.e. how much work the node has to do, we want to adjust the 
number of fragments on the node (constrained to usual global and per node 
limits). 
We do not want to schedule fragment(s) on a node which do not have data. 
Because we want pure locality, we may have fewer fragments doing more work.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #597: DRILL-4905: Push the LIMIT down to the parquet reader.

2016-10-18 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/597
  
updated diffs with review comments taken care of.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push the LIMIT down to the parquet read...

2016-10-18 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r83965044
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java
 ---
@@ -117,4 +119,18 @@ public void testSelectEmptyNoCache() throws Exception {
 uex.getMessage()), uex.getMessage().contains(expectedMsg));
 }
   }
+
+  @Test
+  public void testLimit() throws Exception {
+List results = 
testSqlWithResults(String.format("select * from cp.`parquet/limitTest.parquet` 
limit 1"));
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #597: DRILL-4905: Push the LIMIT down to the parquet read...

2016-10-18 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/597#discussion_r83965017
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -139,6 +155,11 @@ public ParquetRecordReader(
 this.batchSize = batchSize;
 this.footer = footer;
 this.fragmentContext = fragmentContext;
+if (numRecordsToRead == DEFAULT_RECORDS_TO_READ_NOT_SPECIFIED) {
+  this.numRecordsToRead =  
footer.getBlocks().get(rowGroupIndex).getRowCount();
+} else {
+  this.numRecordsToRead = numRecordsToRead;
--- End diff --

Current code handles if numRecordsToRead is not in the range.  So, I am not 
adding additional checks here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #652: DRILL-4990:Use new HDFS API access instead of listS...

2016-11-14 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/652

DRILL-4990:Use new HDFS API access instead of listStatus to check if …

…users have permissions to access workspace.

Manually tested the fix with impersonation enabled. All unit and regression 
tests pass.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-4990

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #652






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #652: DRILL-4990:Use new HDFS API access instead of listStatus t...

2016-11-15 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/652
  
I did not add new unit tests because existing tests already provide enough 
coverage and they run on local file system. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #653: DRILL-4831: Running refresh table metadata concurrently ra...

2016-11-16 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/653
  
Thanks Gautam for the review.  Updated with all review comments addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #653: DRILL-4831: Running refresh table metadata concurre...

2016-11-16 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/653

DRILL-4831: Running refresh table metadata concurrently randomly fail…

…s with JsonParseException

To prevent metadata cache files from getting corrupted, write to a 
temporary file and do rename at the end. All unit and regression tests pass.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-4831

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #653


commit fec9b7ee468cda73dec27f475069898d763fa1c7
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Date:   2016-10-20T12:03:13Z

DRILL-4831: Running refresh table metadata concurrently randomly fails with 
JsonParseException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #653: DRILL-4831: Running refresh table metadata concurrently ra...

2016-11-18 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/653
  
Updated diffs with all review comments taken care of.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #653: DRILL-4831: Running refresh table metadata concurre...

2016-11-18 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/653#discussion_r88770685
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java 
---
@@ -495,31 +499,75 @@ private ParquetFileMetadata_v3 
getParquetFileMetadata_v3(ParquetTableMetadata_v3
* @param p
* @throws IOException
*/
-  private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, 
Path p) throws IOException {
+  private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, 
String path) throws IOException {
 JsonFactory jsonFactory = new JsonFactory();
 jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
 jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
 ObjectMapper mapper = new ObjectMapper(jsonFactory);
 SimpleModule module = new SimpleModule();
 module.addSerializer(ColumnMetadata_v3.class, new 
ColumnMetadata_v3.Serializer());
 mapper.registerModule(module);
-FSDataOutputStream os = fs.create(p);
+
+// If multiple clients are updating metadata cache file concurrently, 
the cache file
+// can get corrupted. To prevent this, write to a unique temporary 
file and then do
+// atomic rename.
+UUID randomUUID =  UUID.randomUUID();
--- End diff --

Yes, I wanted to use queryId as well. But, it is not easily accessible as 
you mentioned. I made the change to use same UUID for METADATA_FILENAME and 
METADATA_DIRECTORIES_FILENAME. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #652: DRILL-4990:Use new HDFS API access instead of listS...

2016-11-15 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/652#discussion_r88097089
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
 ---
@@ -151,15 +152,11 @@ public WorkspaceSchemaFactory(
   public boolean accessible(final String userName) throws IOException {
 final FileSystem fs = ImpersonationUtil.createFileSystem(userName, 
fsConf);
 try {
-  // We have to rely on the listStatus as a FileSystem can have 
complicated controls such as regular unix style
-  // permissions, Access Control Lists (ACLs) or Access Control 
Expressions (ACE). Hadoop 2.7 version of FileSystem
-  // has a limited private API (FileSystem.access) to check the 
permissions directly
-  // (see https://issues.apache.org/jira/browse/HDFS-6570). Drill 
currently relies on Hadoop 2.5.0 version of
-  // FileClient. TODO: Update this when DRILL-3749 is fixed.
-  fs.listStatus(wsPath);
+  fs.access(wsPath, FsAction.READ);
 } catch (final UnsupportedOperationException e) {
-  logger.trace("The filesystem for this workspace does not support 
this operation.", e);
+  logger.debug("The filesystem for this workspace does not support 
this operation.", e);
--- End diff --

I am not returning false to be consistent with the existing behavior.  If 
the user does not have permission, we fail when we try to read the file during 
execution. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...

2016-10-31 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/639
  
Updated the JIRA with details on how current algorithm works, why remote 
reads were happening and the new algorithm details.
https://issues.apache.org/jira/browse/DRILL-4706



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #639: New fragment placement algorithm based on locality ...

2016-10-31 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/639

New fragment placement algorithm based on locality of data.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-4706

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #639


commit 5a7fd87d7cfc26b03389c593bd20349e82dd484c
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Date:   2016-10-07T14:38:09Z

New fragment placement algorithm based on locality of data.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...

2016-11-04 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/639
  
Some initial comments.

The issue is regarding assigning fragments based on strict locality. So why 
is the parallelization logic affected, and not exclusively locality?

Parallelization logic is affected because it decides how many fragments to 
run on each node and that is dependent on locality.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...

2016-11-04 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/639
  
Updated with all review comments taken care of.  Added 
TestLocalAffinityFragmentParallelizer.java which has bunch of test cases with 
examples. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #639: DRILL-4706: Fragment planning causes Drillbits to r...

2016-11-04 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/639#discussion_r86597707
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/LocalAffinityFragmentParallelizer.java
 ---
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Map;
+import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+
+
+/**
+ * Implementation of {@link FragmentParallelizer} where fragment has zero 
or more endpoints.
+ * This is for Parquet Scan Fragments only. Fragment placement is done 
preferring strict
+ * data locality.
+ */
+public class LocalAffinityFragmentParallelizer implements 
FragmentParallelizer {
+public static final LocalAffinityFragmentParallelizer INSTANCE = new 
LocalAffinityFragmentParallelizer();
+
+@Override
+public void parallelizeFragment(final Wrapper fragmentWrapper, final 
ParallelizationParameters parameters, final Collection 
activeEndpoints) throws PhysicalOperatorSetupException {
+
+// Find the parallelization width of fragment
+final Stats stats = fragmentWrapper.getStats();
+final ParallelizationInfo parallelizationInfo = 
stats.getParallelizationInfo();
+
+// 1. Find the parallelization based on cost. Use max cost of all 
operators in this fragment; this is consistent
+//with the calculation that ExcessiveExchangeRemover uses.
+int width = (int) Math.ceil(stats.getMaxCost() / 
parameters.getSliceTarget());
+
+// 2. Cap the parallelization width by fragment level width limit 
and system level per query width limit
+width = Math.min(width, 
Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth()));
+
+// 3. Cap the parallelization width by system level per node width 
limit
+width = Math.min(width, parameters.getMaxWidthPerNode() * 
activeEndpoints.size());
+
+// 4. Make sure width is at least the min width enforced by 
operators
+width = Math.max(parallelizationInfo.getMinWidth(), width);
+
+// 5. Make sure width is at most the max width enforced by 
operators
+width = Math.min(parallelizationInfo.getMaxWidth(), width);
+
+// 6: Finally make sure the width is at least one
+width = Math.max(1, width);
+
+List endpointPool = Lists.newArrayList();
+List assignedEndPoints = Lists.newArrayList();
+
+Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap =
+
fragmentWrapper.getStats().getParallelizationInfo().getEndpointAffinityMap();
+
+int totalAssigned = 0;
+int totalWorkUnits = 0;
+
+// Get the total number of work units and list of endPoints to 
schedule fragments on
+for (Map.Entry<DrillbitEndpoint, EndpointAffinity> epAff : 
endpointAffinityMap.entrySet()) {
+if (epAff.getValue().getNumLocalWorkUnits() > 0) {
+totalWorkUnits += epAff.getValue().getNumLocalWorkUnits();
+endpointPool.add(epAff.getKey());
+}
+}
+
+// Keep track of number of fragments allocated to each endpoint.
+Map<DrillbitEndpoint, Integer> endpointAssignments = new 
HashMap<>();
+
+// Keep track of how many more to assign to each endpoint.
+Map<DrillbitEndpoint, Integer> remainingEndpointAssignments = new 
HashMap<>();
+
+// Calculate the target allocation for each endPoint based on work 
it has to do
+  

[GitHub] drill pull request #592: DRILL-4826: Query against INFORMATION_SCHEMA.TABLES...

2016-10-13 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/592#discussion_r83324461
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
 ---
@@ -290,28 +290,30 @@ public Tables(OptionManager optionManager) {
   return new PojoRecordReader<>(Records.Table.class, 
records.iterator());
 }
 
-@Override
-public void visitTables(String schemaPath, SchemaPlus schema) {
+@Override public void visitTables(String schemaPath, SchemaPlus 
schema) {
--- End diff --

why this change ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #597: DRILL-4905: Push down the LIMIT to the parquet reader scan...

2016-10-11 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/597
  
updated with new diffs. Please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #692: DRILL-5123: Write query profile after sending final respon...

2016-12-16 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/692
  
+1. LGTM



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill issue #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/676
  
+1. Thanks for fixing this and for some more cleanup. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91422697
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2769UnsupportedReportsUseSqlExceptionTest.java
 ---
@@ -368,10 +381,19 @@ public void testConnectionMethodsThrowRight() {
   this.factoryConnection = factoryConnection;
 }
 
+@Override
 protected Statement getJdbcObject() throws SQLException {
   return factoryConnection.createStatement();
 }
 
+@Override
+protected boolean isOkaySpecialCaseException(Method method,
+ Throwable cause) {
+   // New Java 8 method not supported by Avatica
+
--- End diff --

remove the extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91422620
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
 ---
@@ -587,6 +618,12 @@ else if (RuntimeException.class == cause.getClass()
 // Special good-enough case--we had to use RuntimeException for 
now.
 result = true;
   }
+  else if (SQLFeatureNotSupportedException.class == cause.getClass()
+  && (   method.getName().equals("updateObject")
--- End diff --

extra space after ( .   move && to line above. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91422118
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
 ---
@@ -477,18 +477,32 @@ public void testClosedConnectionMethodsThrowRight() {
 }
 
 @Override
+protected boolean isOkayNonthrowingMethod(Method method) {
+  // Java 8 method
+  if ( "getLargeUpdateCount".equals(method.getName())) {
+return true; }
+  return super.isOkayNonthrowingMethod(method);
+}
+
+@Override
 protected boolean isOkaySpecialCaseException(Method method, Throwable 
cause) {
   final boolean result;
   if (super.isOkaySpecialCaseException(method, cause)) {
 result = true;
   }
+  else if (   method.getName().equals("executeLargeBatch")
--- End diff --

remove extra space after if. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91422511
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
 ---
@@ -543,6 +566,14 @@ else if (RuntimeException.class == cause.getClass()
 // Special good-enough case--we had to use RuntimeException for 
now.
 result = true;
   }
+  else if (  method.getName().equals("setObject")
--- End diff --

remove extra space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91421638
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java ---
@@ -66,6 +66,11 @@ public static void setFactory(ConnectionFactory factory) 
{
   public static Properties getDefaultProperties() {
 final Properties properties = new Properties();
 properties.setProperty("drillJDBCUnitTests", "true");
+
+// Must set this to false to ensure that the tests ignore any existing
+// plugin configurations stored in /tmp/drill.
+
--- End diff --

remove the extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91423240
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2769UnsupportedReportsUseSqlExceptionTest.java
 ---
@@ -280,6 +280,10 @@ else if (NullPointerException.class == cause.getClass()
   // code implements them.
   successLinesBuf.append(resultLine);
 }
+else if (isOkaySpecialCaseException(method, cause)) {
+  successLinesBuf.append(resultLine);
+}
+
--- End diff --

remove extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91421707
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2769UnsupportedReportsUseSqlExceptionTest.java
 ---
@@ -397,10 +419,19 @@ public void testPlainStatementMethodsThrowRight() {
   this.factoryConnection = factoryConnection;
 }
 
+@Override
 protected PreparedStatement getJdbcObject() throws SQLException {
   return factoryConnection.prepareStatement("VALUES 1");
 }
 
+@Override
+protected boolean isOkaySpecialCaseException(Method method,
+ Throwable cause) {
+   // New Java 8 method not supported by Avatica
+
--- End diff --

remove the extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8

2016-12-07 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/676#discussion_r91421952
  
--- Diff: 
exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java
 ---
@@ -477,18 +477,32 @@ public void testClosedConnectionMethodsThrowRight() {
 }
 
 @Override
+protected boolean isOkayNonthrowingMethod(Method method) {
+  // Java 8 method
+  if ( "getLargeUpdateCount".equals(method.getName())) {
+return true; }
--- End diff --

remove extra space after if ( .  
} in a separate line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #704: DRILL-5125: Provide option to use generic code for ...

2017-03-21 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/704#discussion_r107219761
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
 ---
@@ -34,4 +38,33 @@ public void testSVRWithNoFilter() throws Exception {
 int numOutputRecords = 
testPhysical(getFile("remover/sv_with_no_filter.json"));
 assertEquals(100, numOutputRecords);
   }
+
+  /**
+   * Test the generic version of the selection vector remover copier
+   * class. The code uses the traditional generated version by default.
+   * This test sets the option to use the generic version, then runs
+   * a query that exercises that version.
+   * 
+   * Note that the tests here exercise only the SV2 version of the
+   * selection remover; no tests exist for the SV4 version.
+   */
+
+  // TODO: Add an SV4 test once the improved mock data generator
+  // is available.
+
+  @Test
+  public void testGenericCopier() throws Exception {
+// TODO: replace this with new setup once revised test framework
+// is available.
+Properties config = new Properties( );
+config.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, 
"false");
+config.put(ExecConstants.HTTP_ENABLE, "false");
+config.put(ExecConstants.REMOVER_ENABLE_GENERIC_COPIER, "true");
+updateTestCluster(1, DrillConfig.create(config));
+
+int numOutputRecords = testPhysical(getFile("remover/test1.json"));
+assertEquals(50, numOutputRecords);
+numOutputRecords = 
testPhysical(getFile("remover/sv_with_no_filter.json"));
+assertEquals(100, numOutputRecords);
+  }
 }
--- End diff --

It would be good to have the tests that cover all the vector types.  But,  
since it is off by default and you are exercising the code through other unit 
tests,  this is fine. 
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108693574
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Base strategy for reading a batch of Parquet records.
+ */
+public abstract class BatchReader {
+
+  protected final ReadState readState;
+
+  public BatchReader(ReadState readState) {
+this.readState = readState;
+  }
+
+  public int readBatch() throws Exception {
+ColumnReader firstColumnStatus = readState.getFirstColumnStatus();
+long recordsToRead = Math.min(getReadCount(firstColumnStatus), 
readState.getRecordsToRead());
+int readCount = readRecords(firstColumnStatus, recordsToRead);
+readState.fillNullVectors(readCount);
+return readCount;
+  }
+
+  protected abstract long getReadCount(ColumnReader firstColumnStatus);
+
+  protected abstract int readRecords(ColumnReader firstColumnStatus, 
long recordsToRead) throws Exception;
+
+  protected void readAllFixedFields(long recordsToRead) throws Exception {
+Stopwatch timer = Stopwatch.createStarted();
+if(readState.useAsyncColReader()){
+  readAllFixedFieldsParallel(recordsToRead);
+} else {
+  readAllFixedFieldsSerial(recordsToRead);
+}
+
readState.parquetReaderStats.timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+  }
+
+  protected void readAllFixedFieldsSerial(long recordsToRead) throws 
IOException {
+for (ColumnReader crs : readState.getReaders()) {
+  crs.processPages(recordsToRead);
+}
+  }
+
+  protected void readAllFixedFieldsParallel(long recordsToRead) throws 
Exception {
+ArrayList<Future> futures = Lists.newArrayList();
+for (ColumnReader crs : readState.getReaders()) {
+  Future f = crs.processPagesAsync(recordsToRead);
+  futures.add(f);
+}
+Exception exception = null;
+for(Future f: futures){
+  if (exception != null) {
+f.cancel(true);
+  } else {
+try {
+  f.get();
+} catch (Exception e) {
+  f.cancel(true);
+  exception = e;
+}
+  }
+}
+if (exception != null) {
+  throw exception;
+}
+  }
+
+  /**
+   * Strategy for reading mock records. (What are these?)
+   */
+
+  public static class MockBatchReader extends BatchReader {
+
+public MockBatchReader(ReadState readState) {
+  super(readState);
+}
+
+@Override
+protected long getReadCount(ColumnReader firstColumnStatus) {
+  if (readState.mockRecordsRead == 
readState.schema().getGroupRecordCount()) {
+return 0;
--- End diff --

How about moving mockRecordsRead to this class instead of keeping it in 
readState ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108557760
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -307,164 +231,49 @@ public FragmentContext getFragmentContext() {
 return fragmentContext;
   }
 
-  /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
-   */
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
-
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
+if (isStarQuery()) {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex);
+} else {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
getColumns());
 }
-columnStatuses = new ArrayList<>();
-List columns = 
footer.getFileMetaData().getSchema().getColumns();
-allFieldsFixedLength = true;
-ColumnDescriptor column;
-ColumnChunkMetaData columnChunkMetaData;
-int columnsToScan = 0;
-mockRecordsRead = 0;
 
-MaterializedField field;
+//ParquetMetadataConverter metaConverter = new 
ParquetMetadataConverter();
+//FileMetaData fileMetaData;
 
--- End diff --

instead of commenting, remove these lines if not needed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108667588
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
+  @Test
+  public void testVariableWidth() throws Exception {
+String sql = "SELECT s_name, s_address, s_phone, s_comment\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+client.queryBuilder().sql(sql).printCsv();
+
+Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("s_name"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_address"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_phone"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_comment"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/variableWidth.csv")
+  .baselineColumns("s_name", "s_address", "s_phone", "s_comment")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+  @Test
+  public void testMixedWidth() throws Exception {
+String sql = "SELECT s_suppkey, s_name, s_address, s_phone, 
s_acctbal\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+typ

  1   2   3   4   >