[GitHub] drill issue #540: Fix for DRILL-4759: Drill throwing array index out of boun...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/540 updated the pull request with test case added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #540: Fix for DRILL-4759: Drill throwing array index out of boun...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/540 updated the fix. Please review. In DictionaryBigIntReader, we need to call parent readField if dictionary encoding is not enabled for the page. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #540: Fix for DRILL-4759: Drill throwing array index out of boun...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/540 The pull request was merged in commit: e371e18 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #540: Fix for DRILL-4759: Drill throwing array index out ...
Github user ppadma closed the pull request at: https://github.com/apache/drill/pull/540 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #540: Fix for DRILL-4759: Drill throwing array index out ...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/540 Fix for DRILL-4759: Drill throwing array index out of bound exception⦠⦠when reading a parquet file written by map reduce program You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill drill-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/540.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #540 commit 9028f59b805b4ee0c769f414eb8492f17d65258c Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-07-05T18:28:11Z Fix for DRILL-4759: Drill throwing array index out of bound exception when reading a parquet file written by map reduce program --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #540: Fix for DRILL-4759: Drill throwing array index out ...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/540#discussion_r69650754 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java --- @@ -156,12 +156,18 @@ protected void readField(long recordsToReadInThisPass) { recordsReadInThisIteration = Math.min(pageReader.currentPageCount - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - for (int i = 0; i < recordsReadInThisIteration; i++){ -try { -valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); -} catch ( Exception ex) { - throw ex; + if (usingDictionary) { --- End diff -- This should ideally be fixed for all types and will be done in a later commit after sufficient testing. For now, just fixing for the type that this issue is reported for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #539: DRILL-4759:Drill throwing array index out of bound ...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/539 DRILL-4759:Drill throwing array index out of bound exception when reading a parquet file written by map reduce program. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/539.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #539 commit f5c60a88eb656b4eb383ba31a088330f45fc2f80 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-06-28T23:16:45Z Fix for MD-904 commit 3ba14375d4c64c5b346ef6ce565638804f520eac Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-06-28T23:16:45Z Update Fix for MD-904 commit 7b837eb37ef0937a5087edd9738712610f79c737 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-06-29T00:22:13Z Merge branch 'master' of https://github.com/ppadma/drill commit a3c20a44d8407af6cb4b01243e7bd799ece2f6d7 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-06-30T18:31:38Z Merge branch 'master' of https://github.com/apache/drill commit 2c335fdddad1052871fa31843bd70df1e092c14b Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-07-01T00:36:22Z Fix for MD-904: Drill throwing array index out of bound exception when reading a parquet file written by map reduce program. commit 950d4a8198145b72fabf4a6ea658a83f1201f058 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-07-01T00:52:38Z MD-904:Drill throwing array index out of bound exception when reading a parquet file written by map reduce program commit 8914c21216f54600b833895c124d222fc058d307 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-07-01T05:51:47Z Updated fix for MD-904 commit 1d5d95a3a4edf44fb0d2cc08e41b9e9b60be8c75 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-07-01T05:57:38Z Updated fix for MD-904 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #539: DRILL-4759:Drill throwing array index out of bound ...
Github user ppadma closed the pull request at: https://github.com/apache/drill/pull/539 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #730: DRILL-5223:Drill should ensure balanced workload as...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/730 DRILL-5223:Drill should ensure balanced workload assignment at node l⦠â¦evel in order to get better query performance. Please see DRILL-5223 for details: https://issues.apache.org/jira/browse/DRILL-5223 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-5223 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/730.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #730 commit a1ad4113f59e87a4885c271a53afea648bb6f9c3 Author: Padma Penumarthy <ppenuma...@yahoo.com> Date: 2017-01-21T01:57:10Z DRILL-5223:Drill should ensure balanced workload assignment at node level in order to get better query performance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #704: DRILL-5125: Provide option to use generic code for ...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/704#discussion_r98299585 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java --- @@ -34,4 +38,33 @@ public void testSVRWithNoFilter() throws Exception { int numOutputRecords = testPhysical(getFile("remover/sv_with_no_filter.json")); assertEquals(100, numOutputRecords); } + + /** + * Test the generic version of the selection vector remover copier + * class. The code uses the traditional generated version by default. + * This test sets the option to use the generic version, then runs + * a query that exercises that version. + * + * Note that the tests here exercise only the SV2 version of the + * selection remover; no tests exist for the SV4 version. + */ + + // TODO: Add an SV4 test once the improved mock data generator + // is available. + + @Test + public void testGenericCopier() throws Exception { +// TODO: replace this with new setup once revised test framework +// is available. +Properties config = new Properties( ); +config.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false"); +config.put(ExecConstants.HTTP_ENABLE, "false"); +config.put(ExecConstants.REMOVER_ENABLE_GENERIC_COPIER, "true"); +updateTestCluster(1, DrillConfig.create(config)); + +int numOutputRecords = testPhysical(getFile("remover/test1.json")); +assertEquals(50, numOutputRecords); +numOutputRecords = testPhysical(getFile("remover/sv_with_no_filter.json")); +assertEquals(100, numOutputRecords); + } } --- End diff -- Do these tests cover all the value vector types ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #704: DRILL-5125: Provide option to use generic code for ...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/704#discussion_r97992610 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Generic selection vector 4 copier implementation that can + * be used in place of the generated version. Relies on a + * virtual function in each value vector to choose the proper + * implementation. Tests suggest that this version performs + * better than the generated version for queries with many columns. + */ + +public class GenericSV4Copier extends CopierTemplate4 { + + private ValueVector[] vvOut; + private ValueVector[][] vvIn; + + @SuppressWarnings("unused") + @Override + public void doSetup(FragmentContext context, RecordBatch incoming, + RecordBatch outgoing) { --- End diff -- fix alignment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/747#discussion_r102378871 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -178,6 +182,19 @@ public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING; recordNewState(initialState); enqueuedQueries.inc(); + +profileOption = setProfileOption(queryContext.getOptions()); + } + + private ProfileOption setProfileOption(OptionManager options) { +if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) { --- End diff -- nit: extra space after ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/747#discussion_r102378651 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -413,4 +413,15 @@ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); + + /** + * Option to save query profiles. If false, no query profile will be saved + * for any query. + */ + String ENABLE_QUERY_PROFILE_OPTION = "exec.query_profile.enable"; --- End diff -- After reading your comment, it feels like exec.query_profile.save would be a better choice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #751: DRILL-5259: Allow listing a user-defined number of ...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/751#discussion_r101862127 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java --- @@ -190,10 +193,13 @@ public QProfiles(List runningQueries, List finishedQue public List getErrors() { return errors; } } + //max Param to cap listing of profiles + private static final String MAX_QPARAM = "max"; + --- End diff -- please choose a better name than MAX_QPARAM something like MAX_QPROFILES_PARAM ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102830312 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -33,35 +33,52 @@ ParquetRecordReader parentReader; final List<VarLengthColumn> columns; final boolean useAsyncTasks; + private final long targetRecordCount; public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns) { this.parentReader = parentReader; this.columns = columns; useAsyncTasks = parentReader.useAsyncColReader; + +// Can't read any more records than fixed width fields will fit. +// Note: this calculation is very likely wrong; it is a simplified +// version of earlier code, but probably needs even more attention. + +int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 8; +if (totalFixedFieldWidth == 0) { + targetRecordCount = 0; +} else { + targetRecordCount = parentReader.getBatchSize() / totalFixedFieldWidth; +} } /** * Reads as many variable length values as possible. * * @param recordsToReadInThisPass - the number of records recommended for reading form the reader - * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from + * @param firstColumnStatus - a reference to the first column status in the Parquet file to grab metatdata from * @return - the number of fixed length fields that will fit in the batch * @throws IOException */ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException { -long recordsReadInCurrentPass = 0; - // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } Stopwatch timer = Stopwatch.createStarted(); -recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); -if(useAsyncTasks){ +long recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); + +// Can't read any more records than fixed width fields will fit. + +if (targetRecordCount > 0) { + recordsToReadInThisPass = Math.min(recordsToReadInThisPass, targetRecordCount); --- End diff -- I think you mean to update recordsReadInCurrentPass. recordsToReadInThisPass is not being used after this. So, what is the point in updating ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102827651 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -70,33 +87,21 @@ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumn return recordsReadInCurrentPass; } - private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { -int lengthVarFieldsInCurrentRecord = 0; -boolean exitLengthDeterminingLoop = false; -long totalVariableLengthData = 0; -long recordsReadInCurrentPass = 0; -do { + +int recordsReadInCurrentPass = 0; +top: do { for (VarLengthColumn columnReader : columns) { -if (!exitLengthDeterminingLoop) { - exitLengthDeterminingLoop = - columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); -} else { - break; +// Return status is "done reading", meaning stop if true. +if (columnReader.determineSize(recordsReadInCurrentPass, 0 /* unused */ )) { --- End diff -- why not remove the unused parameter ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102826142 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -33,35 +33,52 @@ ParquetRecordReader parentReader; final List<VarLengthColumn> columns; final boolean useAsyncTasks; + private final long targetRecordCount; public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns) { this.parentReader = parentReader; this.columns = columns; useAsyncTasks = parentReader.useAsyncColReader; + +// Can't read any more records than fixed width fields will fit. +// Note: this calculation is very likely wrong; it is a simplified +// version of earlier code, but probably needs even more attention. + +int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 8; +if (totalFixedFieldWidth == 0) { + targetRecordCount = 0; +} else { + targetRecordCount = parentReader.getBatchSize() / totalFixedFieldWidth; +} } /** * Reads as many variable length values as possible. * * @param recordsToReadInThisPass - the number of records recommended for reading form the reader - * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from + * @param firstColumnStatus - a reference to the first column status in the Parquet file to grab metatdata from * @return - the number of fixed length fields that will fit in the batch * @throws IOException */ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException { --- End diff -- Since we are not using firstColumnStatus, can we just remove that ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102824318 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -351,7 +353,7 @@ public void setup(OperatorContext operatorContext, OutputMutator output) throws MaterializedField field; //ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); -FileMetaData fileMetaData; +//FileMetaData fileMetaData; --- End diff -- Instead of commenting out, just remove this and earlier line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #757: DRILL-5290: Provide an option to build operator table once...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/757 Thanks Sudheesh, Paul and Arina for the review. Original thought was it is not worth the effort to keep operator table in sync with changes in dynamic UDFs. This option is meant to be used for short running operational queries, which most likely will not need dynamic UDF support (well, at least for now). For long running analytic queries, this is not considered an issue. However, I am looking into implementing Sudheesh's suggestion. I will provide an update soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/757 DRILL-5290: Provide an option to build operator table once for built-⦠â¦in static functions and reuse it across queries. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-5290 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/757.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #757 commit 67ae503b0b820cd9f40ae3ef8d703e9135f90a74 Author: Padma Penumarthy <ppenuma...@yahoo.com> Date: 2017-02-22T18:31:01Z DRILL-5290: Provide an option to build operator table once for built-in static functions and reuse it across queries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #758: DRILL-5287: Provide option to skip updates of ephem...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/758#discussion_r103005063 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -1010,7 +1010,9 @@ public void addToEventQueue(final QueryState newState, final Exception exception private void recordNewState(final QueryState newState) { state = newState; -queryManager.updateEphemeralState(newState); +if (queryContext.getOptions().getOption(ExecConstants.ZK_QUERY_STATE_UPDATE)) { + queryManager.updateEphemeralState(newState); +} --- End diff -- For long running queries, it may not make much difference. It adds latency of around ~50-60 msec for single query. However, with high concurrency, impact of contention because of zookeeper updates is significant. Like I mentioned in the JIRA, for concurrency=100, the average query response time for simple queries is 8 sec vs 0.2 sec with these updates disabled. It does not impact the query profile. Query profile gets updated and written at the end of the query as usual. This option affects only running queries. In Web UI, you will not see running queries and their state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102725896 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -376,14 +378,14 @@ public void setup(OperatorContext operatorContext, OutputMutator output) throws if (dataTypeLength == -1) { allFieldsFixedLength = false; } else { -bitWidthAllFixedFields += dataTypeLength; + bitWidthAllFixedFields += dataTypeLength; } } //rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); if (columnsToScan != 0 && allFieldsFixedLength) { recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, - footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535); + footer.getBlocks().get(0).getColumns().get(0).getValueCount()), DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH); --- End diff -- This is DEFAULT_RECORDS_TO_READ_FIXED_WIDTH (not VARIABLE) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102726321 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -73,6 +72,7 @@ private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024; --- End diff -- while we are at it, I would probably rename DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH to DEFAULT_RECORDS_TO_READ_VARIABLE_WIDTH --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/757#discussion_r102611281 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -413,4 +413,8 @@ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); + + String USE_DYNAMIC_UDFS = "exec.udf.use_dynamic"; --- End diff -- I did not use the existing option "exec.udf.enable_dynamic_support" because if that option is enabled and then disabled later, expectation is that dynamic UDFs added in that window continue to be available and working even after disabling. All other comments addressed. Please review new diffs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #749: DRILL-5266: Parquet returns low-density batches
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/749#discussion_r102726705 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -70,33 +70,31 @@ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumn return recordsReadInCurrentPass; } - private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { -int lengthVarFieldsInCurrentRecord = 0; -boolean exitLengthDeterminingLoop = false; -long totalVariableLengthData = 0; -long recordsReadInCurrentPass = 0; -do { + +// Can't read any more records than fixed width fields will fit. +// Note: this calculation is very likely wrong; it is a simplified +// version of earlier code, but probably needs even more attention. + +int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 8; +long batchSize = parentReader.getBatchSize(); +if (totalFixedFieldWidth > 0) { + recordsToReadInThisPass = Math.min(recordsToReadInThisPass, batchSize / totalFixedFieldWidth); --- End diff -- Instead of fixing it up here, please do it outside the function and pass the correct value for recordsToReadInThisPass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/747#discussion_r102277543 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -413,4 +413,19 @@ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); + + /** + * Option to save query profiles. + * + * async (default): Write query profile after last response + * to the client. + * sync: Write the query profile before the last response to + * the client. Very useful for testing to avoid race conditions. + * none: Don't write the query profile at all. Useful when running + * many production jobs that do not need to be reviewed. + * + */ + String QUERY_PROFILE_OPTION = "exec.profile"; --- End diff -- exec.query_profile.write_mode might be a better choice for clarity. Also, would it be better to have two options ? boolean: exec.query_profile.write_enabled and exec.query_profile.write_mode (sync or async). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #747: DRILL-5257: Run-time control of query profiles
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/747#discussion_r102278999 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -853,7 +875,9 @@ public void close() throws Exception { // storage write; query completion occurs in parallel with profile // persistence. - queryManager.writeFinalProfile(uex); + if (profileOption == ProfileOption.ASYNC) { +writeProfile(uex); --- End diff -- why not call queryManager.writeFinalProfile directly from here and avoid creating another function ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/757#discussion_r102615118 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -413,4 +413,8 @@ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); + + String USE_DYNAMIC_UDFS = "exec.udf.use_dynamic"; --- End diff -- Currently, we have one FunctionRegistryHolder (LocalFunctionRegistry) for both static and dynamic functions, from which we register for each query. It gets updated when we download new jars/functions from zookeeper and will not be same as what it is during startup if dynamic UDF support is enabled and disabled. That means if I use table I built during startup, it will miss the dynamic UDFs that got added in between and to include them, you have to rebuild the table. For that reason, I choose to add a new option. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #758: DRILL-5287: Provide option to skip updates of ephem...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/758#discussion_r103487885 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java --- @@ -280,8 +281,15 @@ public void interrupted(final InterruptedException ex) { } } - QueryState updateEphemeralState(final QueryState queryState) { -switch (queryState) { + void updateEphemeralState(final QueryState queryState) { + // If query is already in zk transient store, ignore the transient state update option. + // Else, they will not be removed from transient store upon completion. + if (transientProfiles.get(stringQueryId) == null && --- End diff -- I want to bypass the option for the queries which are already in transient store when option is enabled. Otherwise, their state will never get updated and/or will never be removed from transient store. web UI will show these queries as running forever :-) Thanks for raising a good point regarding using transientProfiles.get. I made the change to update and use in memory state instead. Please review the new diffs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #766: DRILL-5304: Queries fail intermittently when there ...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/766 DRILL-5304: Queries fail intermittently when there is skew in data di⦠â¦stribution Change the assignment logic so we first make sure we assign up to minCount for all nodes before going up to maxCount per node. Also, fixed a small issue in parallelization code where we are rounding down the calculation of number of fragments to run on nodes with affinity, because of which, sometimes, we schedule less fragments on nodes with affinity vs. nodes without affinity. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-5304 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/766.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #766 commit 49cf9f0b54d8c0ea15c3d6a59f99b8e23870104e Author: Padma Penumarthy <ppenuma...@yahoo.com> Date: 2017-02-28T02:32:24Z DRILL-5304: Queries fail intermittently when there is skew in data distribution --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #757: DRILL-5290: Provide an option to build operator tab...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/757#discussion_r103285068 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -413,4 +413,8 @@ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); + + String USE_DYNAMIC_UDFS = "exec.udf.use_dynamic"; --- End diff -- ok, we need to use readWriteLocks if we update the table each time a function gets added/removed. That is unnecessary overhead and will cause contention with concurrency. One option is to split the table into two, one for built-in functions (which can be accessed without locks) and other for dynamic functions. That will be a bigger change and like I mentioned before, is not considered worth the effort. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/597 DRILL-4905: Push down the LIMIT to the parquet reader scan. For limit N query, where N is less than current default record batchSize (256K for all fixedlength, 32K otherwise), we still end up reading all 256K/32K rows from disk if rowGroup has that many rows. This causes performance degradation especially when there are large number of columns. This fix tries to address this problem by changing the record batchSize parquet record reader uses so we don't read more than what is needed. Also, added a sys option (store.parquet.record_batch_size) to be able to set record batch size. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-4905 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/597.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #597 commit cd665ebdba11f8685ba446f5ec535c81ddd6edc7 Author: Padma Penumarthy <ppenumarthy@ppenumarthy-e653-mpr13.local> Date: 2016-09-26T17:51:07Z DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r81244611 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -864,6 +872,14 @@ public String getDigest() { return toString(); } + public void setCacheFileRoot(String cacheFileRoot) { +this.cacheFileRoot = cacheFileRoot; + } + + public void setBatchSize(long batchSize) { +this.recommendedBatchSize = batchSize; --- End diff -- checking against current value is fine for now. Adding new methods will be a bit outside the scope of this JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80990803 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -115,6 +115,8 @@ private List rowGroupInfos; private Metadata.ParquetTableMetadataBase parquetTableMetadata = null; private String cacheFileRoot = null; + private int batchSize; + private static final int DEFAULT_BATCH_LENGTH = 256 * 1024; --- End diff -- Done. Please review new diffs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80817027 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -115,6 +115,8 @@ private List rowGroupInfos; private Metadata.ParquetTableMetadataBase parquetTableMetadata = null; private String cacheFileRoot = null; + private int batchSize; + private static final int DEFAULT_BATCH_LENGTH = 256 * 1024; --- End diff -- Max value for store.parquet.record_batch_size is 256K. So, it cannot be set to 512K. I changed the name in ParquetGroupScan/ParquetRowGroupScan to recommendedBatchSize as we discussed. Please review new diffs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80754060 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -899,6 +907,16 @@ public FileGroupScan clone(FileSelection selection) throws IOException { return newScan; } + // clone to create new groupscan with new file selection and batchSize. + public ParquetGroupScan clone(FileSelection selection, int batchSize) throws IOException { +ParquetGroupScan newScan = new ParquetGroupScan(this); +newScan.modifyFileSelection(selection); +newScan.cacheFileRoot = selection.cacheFileRoot; --- End diff -- yes, we can. will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80751210 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java --- @@ -107,7 +107,7 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath( { readers.add( new ParquetRecordReader( - context, e.getPath(), e.getRowGroupIndex(), fs, + context, rowGroupScan.getBatchSize(), e.getPath(), e.getRowGroupIndex(), fs, --- End diff -- This fix is done only for native parquet reader. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #597: DRILL-4905: Push down the LIMIT to the parquet reader scan...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/597 updated with review comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80788614 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -899,6 +907,16 @@ public FileGroupScan clone(FileSelection selection) throws IOException { return newScan; } + // clone to create new groupscan with new file selection and batchSize. + public ParquetGroupScan clone(FileSelection selection, int batchSize) throws IOException { +ParquetGroupScan newScan = new ParquetGroupScan(this); +newScan.modifyFileSelection(selection); +newScan.cacheFileRoot = selection.cacheFileRoot; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80788617 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -899,6 +907,16 @@ public FileGroupScan clone(FileSelection selection) throws IOException { return newScan; } + // clone to create new groupscan with new file selection and batchSize. + public ParquetGroupScan clone(FileSelection selection, int batchSize) throws IOException { +ParquetGroupScan newScan = new ParquetGroupScan(this); +newScan.modifyFileSelection(selection); +newScan.cacheFileRoot = selection.cacheFileRoot; +newScan.init(selection.getMetaContext()); +newScan.batchSize = batchSize; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r80788478 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java --- @@ -107,7 +107,7 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath( { readers.add( new ParquetRecordReader( - context, e.getPath(), e.getRowGroupIndex(), fs, + context, rowGroupScan.getBatchSize(), e.getPath(), e.getRowGroupIndex(), fs, --- End diff -- done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push down the LIMIT to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r81238087 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java --- @@ -864,6 +872,14 @@ public String getDigest() { return toString(); } + public void setCacheFileRoot(String cacheFileRoot) { +this.cacheFileRoot = cacheFileRoot; + } + + public void setBatchSize(long batchSize) { +this.recommendedBatchSize = batchSize; --- End diff -- ok. Added the assert. Check updated diffs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #597: DRILL-4905: Push down the LIMIT to the parquet reader scan...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/597 updated with all review comments taken care of. Please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84952909 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB --- End diff -- By KiB and MiB, I think you mean KibiByte and MebiByte, which are not commonly used terms. Had to google search :-) If you want to stick with them, may be add a comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85028094 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -41,43 +51,149 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { break; } - for (VarLengthColumn columnReader : columns ) { + for (VarLengthColumn columnReader : columns) { columnReader.updateReadyToReadPosition(); columnReader.currDefLevel = -1; } recordsReadInCurrentPass++; totalVariableLengthData += lengthVarFieldsInCurrentRecord; } while (recordsReadInCurrentPass < recordsToReadInThisPass); +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); +if(doneReading) { + break; +} +lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; +doneReading = columnReader.checkVectorCapacityReached(); +if(doneReading) { + break; +} + } +} + +exitLengthDeterminingLoop = doneReading; + + // check that the next record will fit in the batch + if (exitLengthDeterminingLoop || --- End diff -- Alignment seem to be off. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84972962 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85136996 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java --- @@ -174,4 +191,11 @@ public DrillFileSystem newFileSystem(Configuration conf) throws IOException { return fs; } + @Override + public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { +Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); +fs = new DrillFileSystem(conf, null); +return fs; + } + --- End diff -- Extra lines here and bunch of other places in the code. Please check and fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84972161 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); --- End diff -- Is it valid to assume dictionaryPageOffset is always greater than current position or should we add a assert/check here ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85141426 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84981300 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84957411 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java --- @@ -99,26 +99,47 @@ int currentPageCount = -1; - private FSDataInputStream inputStream; + protected FSDataInputStream inputStream; // These need to be held throughout reading of the entire column chunk List allocatedDictionaryBuffers; - private final CodecFactory codecFactory; + protected final CodecFactory codecFactory; + protected final String fileName; - private final ParquetReaderStats stats; + protected final ParquetReaderStats stats; + private final boolean useBufferedReader; + private final int scanBufferSize; + private final boolean useFadvise; - PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) -throws ExecutionSetupException{ + PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) --- End diff -- why not import this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84970035 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java --- @@ -148,7 +185,8 @@ public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFie return true; } -lengthVarFieldsInCurrentRecord += dataTypeLengthInBits; +// Never used in this code path. Hard to remove because the method is overidden by subclasses +lengthVarFieldsInCurrentRecord = -1; --- End diff -- why make this -1 even if not used ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84968348 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84969218 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84970233 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java --- @@ -174,4 +191,11 @@ public DrillFileSystem newFileSystem(Configuration conf) throws IOException { return fs; } + @Override + public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { --- End diff -- Please add a comment what this nonTrackingFileSystem is for ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85152503 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + private static int smallBufferSize = 64 * 1024; // 64 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + }
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84971152 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85153296 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java --- @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +public class DirectBufInputStream extends FilterInputStream { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class); + + protected boolean enableHints = true; + protected String streamId; // a name for logging purposes only + protected BufferAllocator allocator; + /** + * The length of the data we expect to read. The caller may, in fact, + * ask for more or less bytes. However this is useful for providing hints where + * the underlying InputStream supports hints (e.g. fadvise) + */ + protected final long totalByteSize; + + /** + * The offset in the underlying stream to start reading from + */ + protected final long startOffset; + + public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, + long totalByteSize, boolean enableHints) { +super(in); +Preconditions.checkArgument(startOffset >= 0); +Preconditions.checkArgument(totalByteSize >= 0); +this.streamId = id; +this.allocator = allocator; +this.startOffset = startOffset; +this.totalByteSize = totalByteSize; +this.enableHints = enableHints; + } + + public void init() throws IOException, UnsupportedOperationException { +checkStreamSupportsByteBuffer(); +if (enableHints) { + fadviseIfAvailable(getInputStream(), this.startOffset, this.totalByteSize); +} +getInputStream().seek(this.startOffset); +return; + } + + public int read() throws IOException { +return getInputStream().read(); + } + + public synchronized int read(DrillBuf buf, int off, int len) throws IOException { +buf.clear(); +ByteBuffer directBuffer = buf.nioBuffer(0, len); +int lengthLeftToRead = len; +while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead); +} +buf.writerIndex(len); +return len; + } + + public synchronized DrillBuf getNext(int bytes) throws IOException { +DrillBuf b = allocator.buffer(bytes); +int bytesRead = -1; +try { +bytesRead = read(b, 0, bytes); +} catch (IOException e){ + b.release(); + throw e; +} +if (bytesRead <= -1) { --- End diff -- bytesRead = 0 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84990242 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { +handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { +threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming +} + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { +DrillBuf data; +boolean isDictionary = false; +synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; +} +if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { +readStatus.setPageData(null); + } + uncompress
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84953931 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, --- End diff -- formatting issue. Please fix here and other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85146088 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.codec.SnappyCodec; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { +super(parentStatus, fs, path, columnChunkMetaData); +if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); +} +asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { +if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { +dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); --- End diff -- I see that input argument f is not used. should we be using f instead of dataReader here ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85150281 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java --- @@ -234,4 +286,48 @@ public static int readIntLittleEndian(DrillBuf in, int offset) { return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } + private class ColumnReaderProcessPagesTask implements Callable { + +private final ColumnReader parent = ColumnReader.this; +private final long recordsToReadInThisPass; + +public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){ + this.recordsToReadInThisPass = recordsToReadInThisPass; +} + +@Override public Long call() throws IOException{ --- End diff -- Space between IOException and { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84987279 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java --- @@ -45,12 +46,15 @@ private final BufferAllocator allocator; private final ScanResult classpathScan; private final ExecutorService executor; + private final ExecutorService scanExecutor; + private final ExecutorService scanDecodeExecutor; public BootStrapContext(DrillConfig config, ScanResult classpathScan) { this.config = config; this.classpathScan = classpathScan; this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-"); -this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-"); +this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), +"BitClient-"); --- End diff -- formatting change - not needed ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85152665 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + private static int smallBufferSize = 64 * 1024; // 64 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + }
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r84982984 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java --- @@ -43,10 +45,18 @@ public abstract OperatorStats getStats(); + public abstract ExecutorService getExecutor(); + + public abstract ExecutorService getScanExecutor(); --- End diff -- Is it possible to use enum and pass that to getExecutor instead of adding specific functions like getScanExecutor, getScanDecodeExecutor etc. ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85153076 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java --- @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +public class DirectBufInputStream extends FilterInputStream { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class); + + protected boolean enableHints = true; + protected String streamId; // a name for logging purposes only + protected BufferAllocator allocator; + /** + * The length of the data we expect to read. The caller may, in fact, + * ask for more or less bytes. However this is useful for providing hints where + * the underlying InputStream supports hints (e.g. fadvise) + */ + protected final long totalByteSize; + + /** + * The offset in the underlying stream to start reading from + */ + protected final long startOffset; + + public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, + long totalByteSize, boolean enableHints) { +super(in); +Preconditions.checkArgument(startOffset >= 0); +Preconditions.checkArgument(totalByteSize >= 0); +this.streamId = id; +this.allocator = allocator; +this.startOffset = startOffset; +this.totalByteSize = totalByteSize; +this.enableHints = enableHints; + } + + public void init() throws IOException, UnsupportedOperationException { +checkStreamSupportsByteBuffer(); +if (enableHints) { + fadviseIfAvailable(getInputStream(), this.startOffset, this.totalByteSize); +} +getInputStream().seek(this.startOffset); +return; + } + + public int read() throws IOException { +return getInputStream().read(); + } + + public synchronized int read(DrillBuf buf, int off, int len) throws IOException { +buf.clear(); +ByteBuffer directBuffer = buf.nioBuffer(0, len); +int lengthLeftToRead = len; +while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead); +} +buf.writerIndex(len); +return len; + } + + public synchronized DrillBuf getNext(int bytes) throws IOException { +DrillBuf b = allocator.buffer(bytes); +int bytesRead = -1; +try { +bytesRead = read(b, 0, bytes); +} catch (IOException e){ --- End diff -- space ) { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r85153504 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java --- @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +public class DirectBufInputStream extends FilterInputStream { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class); + + protected boolean enableHints = true; + protected String streamId; // a name for logging purposes only + protected BufferAllocator allocator; + /** + * The length of the data we expect to read. The caller may, in fact, + * ask for more or less bytes. However this is useful for providing hints where + * the underlying InputStream supports hints (e.g. fadvise) + */ + protected final long totalByteSize; + + /** + * The offset in the underlying stream to start reading from + */ + protected final long startOffset; + + public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, + long totalByteSize, boolean enableHints) { +super(in); +Preconditions.checkArgument(startOffset >= 0); +Preconditions.checkArgument(totalByteSize >= 0); +this.streamId = id; +this.allocator = allocator; +this.startOffset = startOffset; +this.totalByteSize = totalByteSize; +this.enableHints = enableHints; + } + + public void init() throws IOException, UnsupportedOperationException { +checkStreamSupportsByteBuffer(); +if (enableHints) { + fadviseIfAvailable(getInputStream(), this.startOffset, this.totalByteSize); +} +getInputStream().seek(this.startOffset); +return; + } + + public int read() throws IOException { +return getInputStream().read(); + } + + public synchronized int read(DrillBuf buf, int off, int len) throws IOException { +buf.clear(); +ByteBuffer directBuffer = buf.nioBuffer(0, len); +int lengthLeftToRead = len; +while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead); +} +buf.writerIndex(len); +return len; + } + + public synchronized DrillBuf getNext(int bytes) throws IOException { +DrillBuf b = allocator.buffer(bytes); +int bytesRead = -1; +try { +bytesRead = read(b, 0, bytes); +} catch (IOException e){ + b.release(); + throw e; +} +if (bytesRead <= -1) { + b.release(); + return null; +} +return b; + } + + public long getPos() throws IOException { +return getInputStream().getPos(); + } + + public boolean hasRemainder() throws IOException { +// We use the following instead of "getInputStream.available() > 0" because +// available() on HDFS seems to have issues with file sizes +// that are greater than Integer.MAX_VALUE +return (this.getPos() < (this.startOffset + this.totalByteSize)); + } + + protected FSDataInputStream getInputStream() throws IOException { +// Make sure stream is open
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Merged with latest code. All review comments taken care of. All tests pass with the option `store.parquet.use_local_affinity` = true and false, both. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Parallelization logic is affected for following reasons: Depending upon how many rowGroups to scan on a node (based on locality information) i.e. how much work the node has to do, we want to adjust the number of fragments on the node (constrained to usual global and per node limits). We do not want to schedule fragment(s) on a node which do not have data. Because we want pure locality, we may have fewer fragments doing more work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #597: DRILL-4905: Push the LIMIT down to the parquet reader.
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/597 updated diffs with review comments taken care of. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push the LIMIT down to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r83965044 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetGroupScan.java --- @@ -117,4 +119,18 @@ public void testSelectEmptyNoCache() throws Exception { uex.getMessage()), uex.getMessage().contains(expectedMsg)); } } + + @Test + public void testLimit() throws Exception { +List results = testSqlWithResults(String.format("select * from cp.`parquet/limitTest.parquet` limit 1")); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #597: DRILL-4905: Push the LIMIT down to the parquet read...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/597#discussion_r83965017 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -139,6 +155,11 @@ public ParquetRecordReader( this.batchSize = batchSize; this.footer = footer; this.fragmentContext = fragmentContext; +if (numRecordsToRead == DEFAULT_RECORDS_TO_READ_NOT_SPECIFIED) { + this.numRecordsToRead = footer.getBlocks().get(rowGroupIndex).getRowCount(); +} else { + this.numRecordsToRead = numRecordsToRead; --- End diff -- Current code handles if numRecordsToRead is not in the range. So, I am not adding additional checks here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #652: DRILL-4990:Use new HDFS API access instead of listS...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/652 DRILL-4990:Use new HDFS API access instead of listStatus to check if ⦠â¦users have permissions to access workspace. Manually tested the fix with impersonation enabled. All unit and regression tests pass. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-4990 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #652 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #652: DRILL-4990:Use new HDFS API access instead of listStatus t...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/652 I did not add new unit tests because existing tests already provide enough coverage and they run on local file system. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #653: DRILL-4831: Running refresh table metadata concurrently ra...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/653 Thanks Gautam for the review. Updated with all review comments addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #653: DRILL-4831: Running refresh table metadata concurre...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/653 DRILL-4831: Running refresh table metadata concurrently randomly fail⦠â¦s with JsonParseException To prevent metadata cache files from getting corrupted, write to a temporary file and do rename at the end. All unit and regression tests pass. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-4831 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #653 commit fec9b7ee468cda73dec27f475069898d763fa1c7 Author: Padma Penumarthy <ppenuma...@yahoo.com> Date: 2016-10-20T12:03:13Z DRILL-4831: Running refresh table metadata concurrently randomly fails with JsonParseException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #653: DRILL-4831: Running refresh table metadata concurrently ra...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/653 Updated diffs with all review comments taken care of. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #653: DRILL-4831: Running refresh table metadata concurre...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/653#discussion_r88770685 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java --- @@ -495,31 +499,75 @@ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3 * @param p * @throws IOException */ - private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p) throws IOException { + private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, String path) throws IOException { JsonFactory jsonFactory = new JsonFactory(); jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false); jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer()); mapper.registerModule(module); -FSDataOutputStream os = fs.create(p); + +// If multiple clients are updating metadata cache file concurrently, the cache file +// can get corrupted. To prevent this, write to a unique temporary file and then do +// atomic rename. +UUID randomUUID = UUID.randomUUID(); --- End diff -- Yes, I wanted to use queryId as well. But, it is not easily accessible as you mentioned. I made the change to use same UUID for METADATA_FILENAME and METADATA_DIRECTORIES_FILENAME. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #652: DRILL-4990:Use new HDFS API access instead of listS...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/652#discussion_r88097089 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java --- @@ -151,15 +152,11 @@ public WorkspaceSchemaFactory( public boolean accessible(final String userName) throws IOException { final FileSystem fs = ImpersonationUtil.createFileSystem(userName, fsConf); try { - // We have to rely on the listStatus as a FileSystem can have complicated controls such as regular unix style - // permissions, Access Control Lists (ACLs) or Access Control Expressions (ACE). Hadoop 2.7 version of FileSystem - // has a limited private API (FileSystem.access) to check the permissions directly - // (see https://issues.apache.org/jira/browse/HDFS-6570). Drill currently relies on Hadoop 2.5.0 version of - // FileClient. TODO: Update this when DRILL-3749 is fixed. - fs.listStatus(wsPath); + fs.access(wsPath, FsAction.READ); } catch (final UnsupportedOperationException e) { - logger.trace("The filesystem for this workspace does not support this operation.", e); + logger.debug("The filesystem for this workspace does not support this operation.", e); --- End diff -- I am not returning false to be consistent with the existing behavior. If the user does not have permission, we fail when we try to read the file during execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Updated the JIRA with details on how current algorithm works, why remote reads were happening and the new algorithm details. https://issues.apache.org/jira/browse/DRILL-4706 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #639: New fragment placement algorithm based on locality ...
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/639 New fragment placement algorithm based on locality of data. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-4706 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #639 commit 5a7fd87d7cfc26b03389c593bd20349e82dd484c Author: Padma Penumarthy <ppenuma...@yahoo.com> Date: 2016-10-07T14:38:09Z New fragment placement algorithm based on locality of data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Some initial comments. The issue is regarding assigning fragments based on strict locality. So why is the parallelization logic affected, and not exclusively locality? Parallelization logic is affected because it decides how many fragments to run on each node and that is dependent on locality. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Updated with all review comments taken care of. Added TestLocalAffinityFragmentParallelizer.java which has bunch of test cases with examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #639: DRILL-4706: Fragment planning causes Drillbits to r...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/639#discussion_r86597707 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/LocalAffinityFragmentParallelizer.java --- @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.Map; +import java.util.List; +import java.util.Collection; +import java.util.HashMap; + + +/** + * Implementation of {@link FragmentParallelizer} where fragment has zero or more endpoints. + * This is for Parquet Scan Fragments only. Fragment placement is done preferring strict + * data locality. + */ +public class LocalAffinityFragmentParallelizer implements FragmentParallelizer { +public static final LocalAffinityFragmentParallelizer INSTANCE = new LocalAffinityFragmentParallelizer(); + +@Override +public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters, final Collection activeEndpoints) throws PhysicalOperatorSetupException { + +// Find the parallelization width of fragment +final Stats stats = fragmentWrapper.getStats(); +final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo(); + +// 1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent +//with the calculation that ExcessiveExchangeRemover uses. +int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget()); + +// 2. Cap the parallelization width by fragment level width limit and system level per query width limit +width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth())); + +// 3. Cap the parallelization width by system level per node width limit +width = Math.min(width, parameters.getMaxWidthPerNode() * activeEndpoints.size()); + +// 4. Make sure width is at least the min width enforced by operators +width = Math.max(parallelizationInfo.getMinWidth(), width); + +// 5. Make sure width is at most the max width enforced by operators +width = Math.min(parallelizationInfo.getMaxWidth(), width); + +// 6: Finally make sure the width is at least one +width = Math.max(1, width); + +List endpointPool = Lists.newArrayList(); +List assignedEndPoints = Lists.newArrayList(); + +Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap = + fragmentWrapper.getStats().getParallelizationInfo().getEndpointAffinityMap(); + +int totalAssigned = 0; +int totalWorkUnits = 0; + +// Get the total number of work units and list of endPoints to schedule fragments on +for (Map.Entry<DrillbitEndpoint, EndpointAffinity> epAff : endpointAffinityMap.entrySet()) { +if (epAff.getValue().getNumLocalWorkUnits() > 0) { +totalWorkUnits += epAff.getValue().getNumLocalWorkUnits(); +endpointPool.add(epAff.getKey()); +} +} + +// Keep track of number of fragments allocated to each endpoint. +Map<DrillbitEndpoint, Integer> endpointAssignments = new HashMap<>(); + +// Keep track of how many more to assign to each endpoint. +Map<DrillbitEndpoint, Integer> remainingEndpointAssignments = new HashMap<>(); + +// Calculate the target allocation for each endPoint based on work it has to do +
[GitHub] drill pull request #592: DRILL-4826: Query against INFORMATION_SCHEMA.TABLES...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/592#discussion_r83324461 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java --- @@ -290,28 +290,30 @@ public Tables(OptionManager optionManager) { return new PojoRecordReader<>(Records.Table.class, records.iterator()); } -@Override -public void visitTables(String schemaPath, SchemaPlus schema) { +@Override public void visitTables(String schemaPath, SchemaPlus schema) { --- End diff -- why this change ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #597: DRILL-4905: Push down the LIMIT to the parquet reader scan...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/597 updated with new diffs. Please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #692: DRILL-5123: Write query profile after sending final respon...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/692 +1. LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/676 +1. Thanks for fixing this and for some more cleanup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91422697 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2769UnsupportedReportsUseSqlExceptionTest.java --- @@ -368,10 +381,19 @@ public void testConnectionMethodsThrowRight() { this.factoryConnection = factoryConnection; } +@Override protected Statement getJdbcObject() throws SQLException { return factoryConnection.createStatement(); } +@Override +protected boolean isOkaySpecialCaseException(Method method, + Throwable cause) { + // New Java 8 method not supported by Avatica + --- End diff -- remove the extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91422620 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java --- @@ -587,6 +618,12 @@ else if (RuntimeException.class == cause.getClass() // Special good-enough case--we had to use RuntimeException for now. result = true; } + else if (SQLFeatureNotSupportedException.class == cause.getClass() + && ( method.getName().equals("updateObject") --- End diff -- extra space after ( . move && to line above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91422118 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java --- @@ -477,18 +477,32 @@ public void testClosedConnectionMethodsThrowRight() { } @Override +protected boolean isOkayNonthrowingMethod(Method method) { + // Java 8 method + if ( "getLargeUpdateCount".equals(method.getName())) { +return true; } + return super.isOkayNonthrowingMethod(method); +} + +@Override protected boolean isOkaySpecialCaseException(Method method, Throwable cause) { final boolean result; if (super.isOkaySpecialCaseException(method, cause)) { result = true; } + else if ( method.getName().equals("executeLargeBatch") --- End diff -- remove extra space after if. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91422511 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java --- @@ -543,6 +566,14 @@ else if (RuntimeException.class == cause.getClass() // Special good-enough case--we had to use RuntimeException for now. result = true; } + else if ( method.getName().equals("setObject") --- End diff -- remove extra space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91421638 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java --- @@ -66,6 +66,11 @@ public static void setFactory(ConnectionFactory factory) { public static Properties getDefaultProperties() { final Properties properties = new Properties(); properties.setProperty("drillJDBCUnitTests", "true"); + +// Must set this to false to ensure that the tests ignore any existing +// plugin configurations stored in /tmp/drill. + --- End diff -- remove the extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91423240 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2769UnsupportedReportsUseSqlExceptionTest.java --- @@ -280,6 +280,10 @@ else if (NullPointerException.class == cause.getClass() // code implements them. successLinesBuf.append(resultLine); } +else if (isOkaySpecialCaseException(method, cause)) { + successLinesBuf.append(resultLine); +} + --- End diff -- remove extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91421707 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2769UnsupportedReportsUseSqlExceptionTest.java --- @@ -397,10 +419,19 @@ public void testPlainStatementMethodsThrowRight() { this.factoryConnection = factoryConnection; } +@Override protected PreparedStatement getJdbcObject() throws SQLException { return factoryConnection.prepareStatement("VALUES 1"); } +@Override +protected boolean isOkaySpecialCaseException(Method method, + Throwable cause) { + // New Java 8 method not supported by Avatica + --- End diff -- remove the extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #676: DRILL-5091: JDBC unit test fail on Java 8
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/676#discussion_r91421952 --- Diff: exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2489CallsAfterCloseThrowExceptionsTest.java --- @@ -477,18 +477,32 @@ public void testClosedConnectionMethodsThrowRight() { } @Override +protected boolean isOkayNonthrowingMethod(Method method) { + // Java 8 method + if ( "getLargeUpdateCount".equals(method.getName())) { +return true; } --- End diff -- remove extra space after if ( . } in a separate line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #704: DRILL-5125: Provide option to use generic code for ...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/704#discussion_r107219761 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java --- @@ -34,4 +38,33 @@ public void testSVRWithNoFilter() throws Exception { int numOutputRecords = testPhysical(getFile("remover/sv_with_no_filter.json")); assertEquals(100, numOutputRecords); } + + /** + * Test the generic version of the selection vector remover copier + * class. The code uses the traditional generated version by default. + * This test sets the option to use the generic version, then runs + * a query that exercises that version. + * + * Note that the tests here exercise only the SV2 version of the + * selection remover; no tests exist for the SV4 version. + */ + + // TODO: Add an SV4 test once the improved mock data generator + // is available. + + @Test + public void testGenericCopier() throws Exception { +// TODO: replace this with new setup once revised test framework +// is available. +Properties config = new Properties( ); +config.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false"); +config.put(ExecConstants.HTTP_ENABLE, "false"); +config.put(ExecConstants.REMOVER_ENABLE_GENERIC_COPIER, "true"); +updateTestCluster(1, DrillConfig.create(config)); + +int numOutputRecords = testPhysical(getFile("remover/test1.json")); +assertEquals(50, numOutputRecords); +numOutputRecords = testPhysical(getFile("remover/sv_with_no_filter.json")); +assertEquals(100, numOutputRecords); + } } --- End diff -- It would be good to have the tests that cover all the vector types. But, since it is off by default and you are exercising the code through other unit tests, this is fine. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/789#discussion_r108693574 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; + +/** + * Base strategy for reading a batch of Parquet records. + */ +public abstract class BatchReader { + + protected final ReadState readState; + + public BatchReader(ReadState readState) { +this.readState = readState; + } + + public int readBatch() throws Exception { +ColumnReader firstColumnStatus = readState.getFirstColumnStatus(); +long recordsToRead = Math.min(getReadCount(firstColumnStatus), readState.getRecordsToRead()); +int readCount = readRecords(firstColumnStatus, recordsToRead); +readState.fillNullVectors(readCount); +return readCount; + } + + protected abstract long getReadCount(ColumnReader firstColumnStatus); + + protected abstract int readRecords(ColumnReader firstColumnStatus, long recordsToRead) throws Exception; + + protected void readAllFixedFields(long recordsToRead) throws Exception { +Stopwatch timer = Stopwatch.createStarted(); +if(readState.useAsyncColReader()){ + readAllFixedFieldsParallel(recordsToRead); +} else { + readAllFixedFieldsSerial(recordsToRead); +} + readState.parquetReaderStats.timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS)); + } + + protected void readAllFixedFieldsSerial(long recordsToRead) throws IOException { +for (ColumnReader crs : readState.getReaders()) { + crs.processPages(recordsToRead); +} + } + + protected void readAllFixedFieldsParallel(long recordsToRead) throws Exception { +ArrayList<Future> futures = Lists.newArrayList(); +for (ColumnReader crs : readState.getReaders()) { + Future f = crs.processPagesAsync(recordsToRead); + futures.add(f); +} +Exception exception = null; +for(Future f: futures){ + if (exception != null) { +f.cancel(true); + } else { +try { + f.get(); +} catch (Exception e) { + f.cancel(true); + exception = e; +} + } +} +if (exception != null) { + throw exception; +} + } + + /** + * Strategy for reading mock records. (What are these?) + */ + + public static class MockBatchReader extends BatchReader { + +public MockBatchReader(ReadState readState) { + super(readState); +} + +@Override +protected long getReadCount(ColumnReader firstColumnStatus) { + if (readState.mockRecordsRead == readState.schema().getGroupRecordCount()) { +return 0; --- End diff -- How about moving mockRecordsRead to this class instead of keeping it in readState ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/789#discussion_r108557760 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -307,164 +231,49 @@ public FragmentContext getFragmentContext() { return fragmentContext; } - /** - * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding - * {@see SchemaElement}. Neither is enough information alone as the max - * repetition level (indicating if it is an array type) is in the ColumnDescriptor and - * the length of a fixed width field is stored at the schema level. - * - * @return the length if fixed width, else -1 - */ - private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) { -if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - if (column.getMaxRepetitionLevel() > 0) { -return -1; - } - if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { -return se.getType_length() * 8; - } else { -return getTypeLengthInBits(column.getType()); - } -} else { - return -1; -} - } - - @SuppressWarnings({ "resource", "unchecked" }) @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { this.operatorContext = operatorContext; -if (!isStarQuery()) { - columnsFound = new boolean[getColumns().size()]; - nullFilledVectors = new ArrayList<>(); +if (isStarQuery()) { + schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex); +} else { + schema = new ParquetSchema(fragmentContext.getOptions(), getColumns()); } -columnStatuses = new ArrayList<>(); -List columns = footer.getFileMetaData().getSchema().getColumns(); -allFieldsFixedLength = true; -ColumnDescriptor column; -ColumnChunkMetaData columnChunkMetaData; -int columnsToScan = 0; -mockRecordsRead = 0; -MaterializedField field; +//ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); +//FileMetaData fileMetaData; --- End diff -- instead of commenting, remove these lines if not needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/789#discussion_r108667588 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.drill.TestBuilder; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.FixtureBuilder; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ParquetInternalsTest extends ClusterTest { + + @BeforeClass + public static void setup( ) throws Exception { +FixtureBuilder builder = ClusterFixture.builder() + // Set options, etc. + ; +startCluster(builder); + } + + @Test + public void testFixedWidth() throws Exception { +String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity\n" + + "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20"; +//client.queryBuilder().sql(sql).printCsv(); + +Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>(); +typeMap.put(TestBuilder.parsePath("l_orderkey"), Types.required(TypeProtos.MinorType.INT)); +typeMap.put(TestBuilder.parsePath("l_partkey"), Types.required(TypeProtos.MinorType.INT)); +typeMap.put(TestBuilder.parsePath("l_suppkey"), Types.required(TypeProtos.MinorType.INT)); +typeMap.put(TestBuilder.parsePath("l_linenumber"), Types.required(TypeProtos.MinorType.INT)); +typeMap.put(TestBuilder.parsePath("l_quantity"), Types.required(TypeProtos.MinorType.FLOAT8)); +client.testBuilder() + .sqlQuery(sql) + .unOrdered() + .csvBaselineFile("parquet/expected/fixedWidth.csv") + .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", "l_linenumber", "l_quantity") + .baselineTypes(typeMap) + .build() + .run(); + } + + + @Test + public void testVariableWidth() throws Exception { +String sql = "SELECT s_name, s_address, s_phone, s_comment\n" + + "FROM `cp`.`tpch/supplier.parquet` LIMIT 20"; +client.queryBuilder().sql(sql).printCsv(); + +Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>(); +typeMap.put(TestBuilder.parsePath("s_name"), Types.required(TypeProtos.MinorType.VARCHAR)); +typeMap.put(TestBuilder.parsePath("s_address"), Types.required(TypeProtos.MinorType.VARCHAR)); +typeMap.put(TestBuilder.parsePath("s_phone"), Types.required(TypeProtos.MinorType.VARCHAR)); +typeMap.put(TestBuilder.parsePath("s_comment"), Types.required(TypeProtos.MinorType.VARCHAR)); +client.testBuilder() + .sqlQuery(sql) + .unOrdered() + .csvBaselineFile("parquet/expected/variableWidth.csv") + .baselineColumns("s_name", "s_address", "s_phone", "s_comment") + .baselineTypes(typeMap) + .build() + .run(); + } + + @Test + public void testMixedWidth() throws Exception { +String sql = "SELECT s_suppkey, s_name, s_address, s_phone, s_acctbal\n" + + "FROM `cp`.`tpch/supplier.parquet` LIMIT 20"; +//client.queryBuilder().sql(sql).printCsv(); + +Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>(); +typ