Re: Time for a 1.9 Release?
I guess it's DRILL-4730 and not DRILL-4370 On Fri, Nov 4, 2016 at 6:23 PM, Sudheesh Katkamwrote: > Out of the 17 requested tickets, we resolved 13 over the week, and 4 have > been deferred (DRILL-4280, DRILL-4858, DRILL-4370, DRILL-4706). Thank you > everyone! > > I get will get the RC0 out on Monday. > > - Sudheesh > > On Fri, Nov 4, 2016 at 11:49 AM, Jinfeng Ni wrote: > > > Agreed with Parth that we probably should start a separate thread to > > discuss release version number after 1.9.0. > > > > I'll start a new thread to discuss that, and leave this thread for > > drill 1.9.0 release matters. > > > > > > On Thu, Nov 3, 2016 at 3:53 PM, Sudheesh Katkam > > wrote: > > > Gentle reminder that all check-ins should be done by tomorrow. Please > see > > > the latest statuses of commits that we are targeting: > > > > > > https://docs.google.com/spreadsheets/d/1UJSXLrfUNZwUnx_ > > > JzkwAcXSxmcbG7meBDad6ZTxlSmw > > > > > > Thank you, > > > Sudheesh > > > > > > > > > On Tue, Nov 1, 2016 at 11:19 AM, Sudheesh Katkam > > > wrote: > > > > > >> The current list of candidate commits for the release is here: > > >> > > >> https://docs.google.com/spreadsheets/d/1UJSXLrfUNZwUnx_ > > >> JzkwAcXSxmcbG7meBDad6ZTxlSmw > > >> > > >> > > >> On Mon, Oct 31, 2016 at 8:53 AM, Subbu Srinivasan < > > ssriniva...@zscaler.com > > >> > wrote: > > >> > > >>> +1. > > >>> > > >>> On Sun, Oct 30, 2016 at 10:23 PM, Paul Rogers > > >>> wrote: > > >>> > > >>> > For release numbers, 1.10 (then 1.11, 1.12, …) seems like a good > > idea. > > >>> > > > >>> > At first it may seem odd to go to 1.10 from 1.9. Might people get > > >>> confused > > >>> > between 1.10 and 1.1.0? But, there is precedence. Tomcat’s latest > > >>> 7-series > > >>> > release is 7.0.72. Java is on 8u112. And so on. > > >>> > > > >>> > I like the idea of moving to 2.0 later when the team introduces a > > major > > >>> > change, rather than by default just because the numbers roll > around. > > For > > >>> > example, Hadoop when to 2.x when YARN was introduced. Impala > appears > > to > > >>> > have moved to 2.0 when they added Spill to disk for some (all?) > > >>> operators. > > >>> > > > >>> > - Paul > > >>> > > > >>> > > On Oct 28, 2016, at 10:34 AM, Sudheesh Katkam < > sudhe...@apache.org > > > > > >>> > wrote: > > >>> > > > > >>> > > Hi Drillers, > > >>> > > > > >>> > > We have a reasonable number of fixes and features since the last > > >>> release > > >>> > > [1]. Releasing itself takes a while; so I propose we start the > 1.9 > > >>> > release > > >>> > > process. > > >>> > > > > >>> > > I volunteer as the release manager, unless there are objections. > > >>> > > > > >>> > > We should also discuss what the release version number should be > > after > > >>> > 1.9. > > >>> > > > > >>> > > Thank you, > > >>> > > Sudheesh > > >>> > > > > >>> > > [1] https://issues.apache.org/jira/browse/DRILL/fixforversion/ > > >>> 12337861 > > >>> > > > >>> > > > >>> > > >> > > >> > > >
[GitHub] drill issue #613: DRILL-4730: Update JDBC DatabaseMetaData implementation to...
Github user laurentgo commented on the issue: https://github.com/apache/drill/pull/613 I added the necessary fallbacks for prepared statement and metadata, so a new client should now be able to talk with an old server without adverse effects. I would prefer that this change to be in this release, to match the state of the C++ client, but I realized this is getting late. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Time for a 1.9 Release?
Out of the 17 requested tickets, we resolved 13 over the week, and 4 have been deferred (DRILL-4280, DRILL-4858, DRILL-4370, DRILL-4706). Thank you everyone! I get will get the RC0 out on Monday. - Sudheesh On Fri, Nov 4, 2016 at 11:49 AM, Jinfeng Niwrote: > Agreed with Parth that we probably should start a separate thread to > discuss release version number after 1.9.0. > > I'll start a new thread to discuss that, and leave this thread for > drill 1.9.0 release matters. > > > On Thu, Nov 3, 2016 at 3:53 PM, Sudheesh Katkam > wrote: > > Gentle reminder that all check-ins should be done by tomorrow. Please see > > the latest statuses of commits that we are targeting: > > > > https://docs.google.com/spreadsheets/d/1UJSXLrfUNZwUnx_ > > JzkwAcXSxmcbG7meBDad6ZTxlSmw > > > > Thank you, > > Sudheesh > > > > > > On Tue, Nov 1, 2016 at 11:19 AM, Sudheesh Katkam > > wrote: > > > >> The current list of candidate commits for the release is here: > >> > >> https://docs.google.com/spreadsheets/d/1UJSXLrfUNZwUnx_ > >> JzkwAcXSxmcbG7meBDad6ZTxlSmw > >> > >> > >> On Mon, Oct 31, 2016 at 8:53 AM, Subbu Srinivasan < > ssriniva...@zscaler.com > >> > wrote: > >> > >>> +1. > >>> > >>> On Sun, Oct 30, 2016 at 10:23 PM, Paul Rogers > >>> wrote: > >>> > >>> > For release numbers, 1.10 (then 1.11, 1.12, …) seems like a good > idea. > >>> > > >>> > At first it may seem odd to go to 1.10 from 1.9. Might people get > >>> confused > >>> > between 1.10 and 1.1.0? But, there is precedence. Tomcat’s latest > >>> 7-series > >>> > release is 7.0.72. Java is on 8u112. And so on. > >>> > > >>> > I like the idea of moving to 2.0 later when the team introduces a > major > >>> > change, rather than by default just because the numbers roll around. > For > >>> > example, Hadoop when to 2.x when YARN was introduced. Impala appears > to > >>> > have moved to 2.0 when they added Spill to disk for some (all?) > >>> operators. > >>> > > >>> > - Paul > >>> > > >>> > > On Oct 28, 2016, at 10:34 AM, Sudheesh Katkam > > >>> > wrote: > >>> > > > >>> > > Hi Drillers, > >>> > > > >>> > > We have a reasonable number of fixes and features since the last > >>> release > >>> > > [1]. Releasing itself takes a while; so I propose we start the 1.9 > >>> > release > >>> > > process. > >>> > > > >>> > > I volunteer as the release manager, unless there are objections. > >>> > > > >>> > > We should also discuss what the release version number should be > after > >>> > 1.9. > >>> > > > >>> > > Thank you, > >>> > > Sudheesh > >>> > > > >>> > > [1] https://issues.apache.org/jira/browse/DRILL/fixforversion/ > >>> 12337861 > >>> > > >>> > > >>> > >> > >> >
[jira] [Resolved] (DRILL-4969) Basic implementation for displaySize
[ https://issues.apache.org/jira/browse/DRILL-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sudheesh Katkam resolved DRILL-4969. Resolution: Fixed Reviewer: Sudheesh Katkam Fixed in [3fd6938|https://github.com/apache/drill/commit/3fd69387591ebd56286b39749adc1dd721b3f14d] > Basic implementation for displaySize > > > Key: DRILL-4969 > URL: https://issues.apache.org/jira/browse/DRILL-4969 > Project: Apache Drill > Issue Type: Sub-task > Components: Metadata >Reporter: Laurent Goujon >Assignee: Laurent Goujon > Fix For: 1.9.0 > > > display size is fixed to 10, but for most types display size is well defined > as shown in the ODBC table: > https://msdn.microsoft.com/en-us/library/ms713974(v=vs.85).aspx -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (DRILL-4945) Missing subtype information in metadata returned by prepared statement
[ https://issues.apache.org/jira/browse/DRILL-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sudheesh Katkam resolved DRILL-4945. Resolution: Fixed Assignee: Laurent Goujon Reviewer: Sudheesh Katkam Fix Version/s: 1.9.0 Fixed in [2365ac0|https://github.com/apache/drill/commit/2365ac05524e21b2d084e5138e70a24092fe1bd8] > Missing subtype information in metadata returned by prepared statement > -- > > Key: DRILL-4945 > URL: https://issues.apache.org/jira/browse/DRILL-4945 > Project: Apache Drill > Issue Type: Bug >Reporter: Laurent Goujon >Assignee: Laurent Goujon >Priority: Minor > Fix For: 1.9.0 > > > Column metadata returned by prepared statement contains partial type > information, especially for interval types. > Currently it only returns "INTERVAL" instead of a more precise type like > "INTERVAL MONTH" for example. > There's also no minor type, so the client can adjust the type itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] drill pull request #618: DRILL-4945: Report INTERVAL exact type as column ty...
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/618 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #610: DRILL-4674: Allow casting to boolean the same liter...
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/610 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #632: DRILL-4969: Basic implementation of display size fo...
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/632 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #560: DRILL-4823: Fix OOM while trying to prune partition...
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/560 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (DRILL-4800) Improve parquet reader performance
[ https://issues.apache.org/jira/browse/DRILL-4800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Chandra resolved DRILL-4800. -- Resolution: Fixed Fixed in fe2334e, f9a443d, 7f5acf8, ee3489c > Improve parquet reader performance > -- > > Key: DRILL-4800 > URL: https://issues.apache.org/jira/browse/DRILL-4800 > Project: Apache Drill > Issue Type: Improvement >Reporter: Parth Chandra >Assignee: Parth Chandra > Labels: doc-impacting > > Reported by a user in the field - > We're generally getting read speeds of about 100-150 MB/s/node on PARQUET > scan operator. This seems a little low given the number of drives on the node > - 24. We're looking for options we can improve the performance of this > operator as most of our queries are I/O bound. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (DRILL-5004) Parquet date correction gives null pointer exception if there is no createdBy entry in the metadata
[ https://issues.apache.org/jira/browse/DRILL-5004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Chandra resolved DRILL-5004. -- Resolution: Fixed Fixed in a459e4d > Parquet date correction gives null pointer exception if there is no createdBy > entry in the metadata > --- > > Key: DRILL-5004 > URL: https://issues.apache.org/jira/browse/DRILL-5004 > Project: Apache Drill > Issue Type: Bug >Reporter: Parth Chandra >Assignee: Vitalii Diravka > Attachments: DRILL-5004.parquet > > > If the Parquet metadata does not contain a createdBy entry, the date > corruption detection code gives a NPE > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] drill issue #613: DRILL-4730: Update JDBC DatabaseMetaData implementation to...
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/613 I just realized you mentioned about this on the mailing list. In that case, let's defer committing this pull request to the next release. > That said, the metadata change is not merged yet (and I will add the extra > logic), which is not the case regarding prepared statement support. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/611 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #646: DRILL-5004: Fix NPE in corrupt date detection
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/646 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #637: Drill 1950 : Parquet row group filter pushdown.
Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/637 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #613: DRILL-4730: Update JDBC DatabaseMetaData implementation to...
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/613 @laurentgo thanks for splitting the commit! Looks like queries are replaced by API calls in the core changes, so can the new driver talk to old servers? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #639: DRILL-4706: Fragment planning causes Drillbits to r...
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/639#discussion_r86646115 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestLocalAffinityFragmentParallelizer.java --- @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Collections; + +import static java.lang.Integer.MAX_VALUE; +import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT; +import static org.apache.drill.exec.planner.fragment.LocalAffinityFragmentParallelizer.INSTANCE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + + +public class TestLocalAffinityFragmentParallelizer { + +// Create a set of test endpoints +private static final DrillbitEndpoint DEP1 = newDrillbitEndpoint("node1", 30010); +private static final DrillbitEndpoint DEP2 = newDrillbitEndpoint("node2", 30010); +private static final DrillbitEndpoint DEP3 = newDrillbitEndpoint("node3", 30010); +private static final DrillbitEndpoint DEP4 = newDrillbitEndpoint("node4", 30010); +private static final DrillbitEndpoint DEP5 = newDrillbitEndpoint("node5", 30010); + +@Mocked private Fragment fragment; +@Mocked private PhysicalOperator root; + +private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) { +return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build(); +} + +private static final ParallelizationParameters newParameters(final long threshold, final int maxWidthPerNode, + final int maxGlobalWidth) { +return new ParallelizationParameters() { +@Override +public long getSliceTarget() { +return threshold; +} + +@Override +public int getMaxWidthPerNode() { +return maxWidthPerNode; +} + +@Override +public int getMaxGlobalWidth() { +return maxGlobalWidth; +} + +/** + * {@link LocalAffinityFragmentParallelizer} doesn't use affinity factor. + * @return + */ +@Override +public double getAffinityFactor() { +return 0.0f; +} +}; +} + +private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List endpointAffinities) { +new NonStrictExpectations() { +{ +fragment.getRoot(); result = root; +} +}; + +final Wrapper fragmentWrapper = new Wrapper(fragment, 1); +final Stats stats = fragmentWrapper.getStats(); +stats.setDistributionAffinity(DistributionAffinity.LOCAL); +stats.addCost(cost); +stats.addMinWidth(minWidth); +stats.addMaxWidth(maxWidth); +stats.addEndpointAffinities(endpointAffinities); +return fragmentWrapper; +} + +private void checkEndpointAssignments(List assignedEndpoints, + MapexpectedAssignments) throws Exception { +Map endpointAssignments = new
[GitHub] drill pull request #639: DRILL-4706: Fragment planning causes Drillbits to r...
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/639#discussion_r86602221 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestLocalAffinityFragmentParallelizer.java --- @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Collections; + +import static java.lang.Integer.MAX_VALUE; +import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT; +import static org.apache.drill.exec.planner.fragment.LocalAffinityFragmentParallelizer.INSTANCE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + + +public class TestLocalAffinityFragmentParallelizer { + +// Create a set of test endpoints +private static final DrillbitEndpoint DEP1 = newDrillbitEndpoint("node1", 30010); +private static final DrillbitEndpoint DEP2 = newDrillbitEndpoint("node2", 30010); +private static final DrillbitEndpoint DEP3 = newDrillbitEndpoint("node3", 30010); +private static final DrillbitEndpoint DEP4 = newDrillbitEndpoint("node4", 30010); +private static final DrillbitEndpoint DEP5 = newDrillbitEndpoint("node5", 30010); + +@Mocked private Fragment fragment; +@Mocked private PhysicalOperator root; + +private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) { +return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build(); +} + +private static final ParallelizationParameters newParameters(final long threshold, final int maxWidthPerNode, + final int maxGlobalWidth) { +return new ParallelizationParameters() { +@Override +public long getSliceTarget() { +return threshold; +} + +@Override +public int getMaxWidthPerNode() { +return maxWidthPerNode; +} + +@Override +public int getMaxGlobalWidth() { +return maxGlobalWidth; +} + +/** + * {@link LocalAffinityFragmentParallelizer} doesn't use affinity factor. + * @return + */ +@Override +public double getAffinityFactor() { +return 0.0f; +} +}; +} + +private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List endpointAffinities) { +new NonStrictExpectations() { +{ +fragment.getRoot(); result = root; +} +}; + +final Wrapper fragmentWrapper = new Wrapper(fragment, 1); +final Stats stats = fragmentWrapper.getStats(); +stats.setDistributionAffinity(DistributionAffinity.LOCAL); +stats.addCost(cost); +stats.addMinWidth(minWidth); +stats.addMaxWidth(maxWidth); +stats.addEndpointAffinities(endpointAffinities); +return fragmentWrapper; +} + +private void checkEndpointAssignments(List assignedEndpoints, + MapexpectedAssignments) throws Exception { +Map endpointAssignments = new
[GitHub] drill pull request #639: DRILL-4706: Fragment planning causes Drillbits to r...
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/639#discussion_r86605131 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/LocalAffinityFragmentParallelizer.java --- @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.Map; +import java.util.List; +import java.util.Collection; +import java.util.HashMap; +import java.util.Collections; + +/** + * Implementation of {@link FragmentParallelizer} where fragment has zero or more endpoints. + * This is for Parquet Scan Fragments only. Fragment placement is done preferring + * data locality. + */ +public class LocalAffinityFragmentParallelizer implements FragmentParallelizer { +public static final LocalAffinityFragmentParallelizer INSTANCE = new LocalAffinityFragmentParallelizer(); + +// Sort a list of map entries by values. +Ordering> sortByValues = new Ordering >() { +@Override +public int compare(Map.Entry left, Map.Entry right) { +return right.getValue().compareTo(left.getValue()); +} +}; + +@Override +public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters, +final Collection activeEndpoints) throws PhysicalOperatorSetupException { +final Stats stats = fragmentWrapper.getStats(); +final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo(); +final Map endpointAffinityMap = + fragmentWrapper.getStats().getParallelizationInfo().getEndpointAffinityMap(); +int totalWorkUnits = 0; +Map endpointPool = new HashMap<>(); + +// Get the total number of work units and list of endPoints to schedule fragments on +for (Map.Entry epAff : endpointAffinityMap.entrySet()) { +if (epAff.getValue().getNumLocalWorkUnits() > 0) { +totalWorkUnits += epAff.getValue().getNumLocalWorkUnits(); +endpointPool.put(epAff.getKey(), epAff.getValue().getNumLocalWorkUnits()); +} +} + +// Find the parallelization width of fragment +// 1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent +//with the calculation that ExcessiveExchangeRemover uses. +int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget()); + +// 2. Cap the parallelization width by fragment level width limit and system level per query width limit +width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth())); + +// 3. Cap the parallelization width by system level per node width limit +width = Math.min(width, parameters.getMaxWidthPerNode() * endpointPool.size()); + +// 4. Make sure width is at least the min width enforced by operators +width = Math.max(parallelizationInfo.getMinWidth(), width); + +// 5. Make sure width is at most the max width enforced by operators +width = Math.min(parallelizationInfo.getMaxWidth(), width); + +// 6: Finally make sure the width is at least one +width = Math.max(1, width); + +List
[GitHub] drill issue #645: DRILL-4995: Allow lazy init when dynamic UDF support is di...
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/645 I take back my +1; this fails unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86627650 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); +if(doneReading) { + break; +} +lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; +doneReading = columnReader.checkVectorCapacityReached(); +if(doneReading) { + break; +} + } +} + +exitLengthDeterminingLoop = doneReading; + + // check that the next record will fit in the batch + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { break; } - for (VarLengthColumn columnReader : columns ) { + for (VarLengthColumn columnReader : columns) { columnReader.updateReadyToReadPosition(); columnReader.currDefLevel = -1; } recordsReadInCurrentPass++; totalVariableLengthData += lengthVarFieldsInCurrentRecord; } while (recordsReadInCurrentPass < recordsToReadInThisPass); +return recordsReadInCurrentPass; + } + + public boolean readPagesParallel() { + +boolean isDone = false; +ArrayListfutures = Lists.newArrayList(); +for
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86607745 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java --- @@ -234,4 +286,48 @@ public static int readIntLittleEndian(DrillBuf in, int offset) { return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } + private class ColumnReaderProcessPagesTask implements Callable { + +private final ColumnReader parent = ColumnReader.this; +private final long recordsToReadInThisPass; + +public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){ + this.recordsToReadInThisPass = recordsToReadInThisPass; +} + +@Override public Long call() throws IOException{ + + String oldname = Thread.currentThread().getName(); --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86627781 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); +if(doneReading) { + break; +} +lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; +doneReading = columnReader.checkVectorCapacityReached(); +if(doneReading) { + break; +} + } +} + +exitLengthDeterminingLoop = doneReading; + + // check that the next record will fit in the batch + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { break; } - for (VarLengthColumn columnReader : columns ) { + for (VarLengthColumn columnReader : columns) { columnReader.updateReadyToReadPosition(); columnReader.currDefLevel = -1; } recordsReadInCurrentPass++; totalVariableLengthData += lengthVarFieldsInCurrentRecord; } while (recordsReadInCurrentPass < recordsToReadInThisPass); +return recordsReadInCurrentPass; + } + + public boolean readPagesParallel() { + +boolean isDone = false; +ArrayListfutures = Lists.newArrayList(); +for
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86621622 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -417,12 +420,37 @@ private void resetBatch() { } public void readAllFixedFields(long recordsToRead) throws IOException { - - for (ColumnReader crs : columnStatuses) { - crs.processPages(recordsToRead); + boolean useAsyncColReader = + fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val; + if(useAsyncColReader){ +readAllFixedFieldsParallel(recordsToRead) ; + } else { + readAllFixedFieldsiSerial(recordsToRead); ; } } + public void readAllFixedFieldsiSerial(long recordsToRead) throws IOException { +for (ColumnReader crs : columnStatuses) { + crs.processPages(recordsToRead); +} + } + + public void readAllFixedFieldsParallel(long recordsToRead) throws IOException { +ArrayListfutures = Lists.newArrayList(); +for (ColumnReader crs : columnStatuses) { + Future f = crs.processPagesAsync(recordsToRead); + futures.add(f); +} +for(Future f: futures){ + try { +f.get(); + } catch (Exception e) { +f.cancel(true); +handleAndRaise(null, e); --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Parallelization logic is affected for following reasons: Depending upon how many rowGroups to scan on a node (based on locality information) i.e. how much work the node has to do, we want to adjust the number of fragments on the node (constrained to usual global and per node limits). We do not want to schedule fragment(s) on a node which do not have data. Because we want pure locality, we may have fewer fragments doing more work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #645: DRILL-4995: Allow lazy init when dynamic UDF support is di...
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/645 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #646: DRILL-5004: Fix NPE in corrupt date detection
Github user jinfengni commented on the issue: https://github.com/apache/drill/pull/646 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user sudheeshkatkam commented on the issue: https://github.com/apache/drill/pull/639 Hmm the answer seems like a rephrasing of the question. Sorry, I misspoke. Better asked: The issue is regarding assigning **_work to_** fragments based on strict locality (**_decide which fragment does what_**). So why is the parallelization (**_decide how many fragments_**) logic affected? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #638: DRILL-4982: Separate Hive reader classes for different dat...
Github user chunhui-shi commented on the issue: https://github.com/apache/drill/pull/638 The performance degradation is worse with larger tables. The degradation is >300% on a 15GB table. After this fix, this degradation disappears. There are other visible performance gains of this fix even when there is no degradation: For a simple query on ORC/Parquet table through HiveReaders, I observed the improvement is about 10%-25% and the average improved percentage is 17.6%. For TPCH tests on a 10 node cluster with Parquet tables + HiveReaders, the average improvement (of two runs) is 5.1% --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Time for a 1.9 Release?
Agreed with Parth that we probably should start a separate thread to discuss release version number after 1.9.0. I'll start a new thread to discuss that, and leave this thread for drill 1.9.0 release matters. On Thu, Nov 3, 2016 at 3:53 PM, Sudheesh Katkamwrote: > Gentle reminder that all check-ins should be done by tomorrow. Please see > the latest statuses of commits that we are targeting: > > https://docs.google.com/spreadsheets/d/1UJSXLrfUNZwUnx_ > JzkwAcXSxmcbG7meBDad6ZTxlSmw > > Thank you, > Sudheesh > > > On Tue, Nov 1, 2016 at 11:19 AM, Sudheesh Katkam > wrote: > >> The current list of candidate commits for the release is here: >> >> https://docs.google.com/spreadsheets/d/1UJSXLrfUNZwUnx_ >> JzkwAcXSxmcbG7meBDad6ZTxlSmw >> >> >> On Mon, Oct 31, 2016 at 8:53 AM, Subbu Srinivasan > > wrote: >> >>> +1. >>> >>> On Sun, Oct 30, 2016 at 10:23 PM, Paul Rogers >>> wrote: >>> >>> > For release numbers, 1.10 (then 1.11, 1.12, …) seems like a good idea. >>> > >>> > At first it may seem odd to go to 1.10 from 1.9. Might people get >>> confused >>> > between 1.10 and 1.1.0? But, there is precedence. Tomcat’s latest >>> 7-series >>> > release is 7.0.72. Java is on 8u112. And so on. >>> > >>> > I like the idea of moving to 2.0 later when the team introduces a major >>> > change, rather than by default just because the numbers roll around. For >>> > example, Hadoop when to 2.x when YARN was introduced. Impala appears to >>> > have moved to 2.0 when they added Spill to disk for some (all?) >>> operators. >>> > >>> > - Paul >>> > >>> > > On Oct 28, 2016, at 10:34 AM, Sudheesh Katkam >>> > wrote: >>> > > >>> > > Hi Drillers, >>> > > >>> > > We have a reasonable number of fixes and features since the last >>> release >>> > > [1]. Releasing itself takes a while; so I propose we start the 1.9 >>> > release >>> > > process. >>> > > >>> > > I volunteer as the release manager, unless there are objections. >>> > > >>> > > We should also discuss what the release version number should be after >>> > 1.9. >>> > > >>> > > Thank you, >>> > > Sudheesh >>> > > >>> > > [1] https://issues.apache.org/jira/browse/DRILL/fixforversion/ >>> 12337861 >>> > >>> > >>> >> >>
Re: JDBC Plugin and Date type
Hi Rahul, Looks like the sqline cuts the values, because from drill unit tests the same query returns a full result: | EXPR$0| - | 10297-04-27T22:50:00.000Z | Kind regards Vitalii 2016-11-04 18:34 GMT+00:00 rahul challapalli : > Folks, > > I have a couple of questions. > > 1. After the fix for DRILL-4203, I tried querying parquet files by > disabling the auto-correction. Below is what I got from JDBC . However > sqlline gets rid of the first character and displays the proper result > > Query : select l_shipdate from table(cp.`tpch/lineitem.parquet` (type => > 'parquet', autoCorrectCorruptDates => false)) order by l_shipdate limit 10; > > > ^@356-03-19 > ^@356-03-21 > ^@356-03-21 > ^@356-03-23 > ^@356-03-24 > ^@356-03-24 > ^@356-03-26 > ^@356-03-26 > ^@356-03-26 > ^@356-03-26 > 2. From sqlline, I can't get date values greater than th year. The > below query should have returned '10297-04-27'. Am I missing anything? > > 0: jdbc:drill:zk=local> select TO_DATE(26278490460) from (VALUES(1)); > +-+ > | EXPR$0| > +-+ > | 297-04-27 | > +-+ > > > - Rahul >
[jira] [Created] (DRILL-5006) Drill web UI hangs if execute a query with a large result set
Paul Rogers created DRILL-5006: -- Summary: Drill web UI hangs if execute a query with a large result set Key: DRILL-5006 URL: https://issues.apache.org/jira/browse/DRILL-5006 Project: Apache Drill Issue Type: Bug Affects Versions: 1.8.0 Reporter: Paul Rogers Priority: Minor Using a pipe-separated file 388 MB in size, do a SELECT * on the file (also added an ORDER BY). This produces a huge result set. Do this query in the web UI. A number of things happen: * Using top, we see that Drill grinds on the query for a while, as expected. * Top shows Drill to stop consuming CPU. The web UI does nothing during this time. This time lasts several minutes. * Drill eventually shows the first 30 rows of the query. * The browser "spins" forever waiting for the server to finish sending data. * The Safari browser enters the "spinning beachball" state on that page because of memory use, waiting for data, etc. Fetching of data seems to never complete, even after many minutes. It is known that the web UI is limited. But, it should not be possible to overwhelm it with an errant query. Possible solutions: 1. Spill large results to disk, then page them in the display. Discard them after the web session ends, after some amount of time, etc. (This is that long-term, but hard solution.) 2. Read the first 30 (or whatever results) and display them. Read and discard all other results to keep the browser (or web server or whatever is hanging) from becoming overloaded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] drill pull request #637: Drill 1950 : Parquet row group filter pushdown.
Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/drill/pull/637#discussion_r86606811 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java --- @@ -99,6 +99,13 @@ public static final LongValidator IN_SUBQUERY_THRESHOLD = new PositiveLongValidator("planner.in_subquery_threshold", Integer.MAX_VALUE, 20); /* Same as Calcite's default IN List subquery size */ + public static final String PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY = "planner.store.parquet.rowgroup.filter.pushdown"; + public static final BooleanValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING = new BooleanValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY, true); + public static final String PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY = "planner.store.parquet.rowgroup.filter.pushdown.threshold"; + public static final PositiveLongValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new PositiveLongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY, --- End diff -- I understand some follow-up work would be needed to calibrate this threshold but its good to have this knob. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (DRILL-5005) Potential issues with external sort info in query profile
Paul Rogers created DRILL-5005: -- Summary: Potential issues with external sort info in query profile Key: DRILL-5005 URL: https://issues.apache.org/jira/browse/DRILL-5005 Project: Apache Drill Issue Type: Bug Affects Versions: 1.9.0 Reporter: Paul Rogers Priority: Minor Run a query that will include an external sort. Look at the visualized plan. The external sort is shown in the tree as just "Sort". Suggestion: say "External Sort". In the operator profiles section, memory use for EXTERNAL_SORT is listed as 26MB. Yet the file being sorted is 388 MB. Even allowing for projects of a subset of column, 26 MB seems awfully small to hold the data set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (DRILL-5004) Parquet date correction gives null pointer exception if there is no createdBy entry in the metadata
Parth Chandra created DRILL-5004: Summary: Parquet date correction gives null pointer exception if there is no createdBy entry in the metadata Key: DRILL-5004 URL: https://issues.apache.org/jira/browse/DRILL-5004 Project: Apache Drill Issue Type: Bug Reporter: Parth Chandra If the Parquet metadata does not contain a createdBy entry, the date corruption detection code gives a NPE -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86452041 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private final int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + } + + /** + * Creates a
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86451464 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private final int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + } + + /** + * Creates a
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86452391 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private final int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + } + + /** + * Creates a
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86462356 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java --- @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +public class DirectBufInputStream extends FilterInputStream { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class); + + protected boolean enableHints = true; + protected String streamId; // a name for logging purposes only + protected BufferAllocator allocator; + /** + * The length of the data we expect to read. The caller may, in fact, + * ask for more or less bytes. However this is useful for providing hints where + * the underlying InputStream supports hints (e.g. fadvise) + */ + protected final long totalByteSize; + + /** + * The offset in the underlying stream to start reading from + */ + protected final long startOffset; + + public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, + long totalByteSize, boolean enableHints) { +super(in); +Preconditions.checkArgument(startOffset >= 0); +Preconditions.checkArgument(totalByteSize >= 0); +this.streamId = id; +this.allocator = allocator; +this.startOffset = startOffset; +this.totalByteSize = totalByteSize; +this.enableHints = enableHints; + } + + public void init() throws IOException, UnsupportedOperationException { +checkStreamSupportsByteBuffer(); +if (enableHints) { + fadviseIfAvailable(getInputStream(), this.startOffset, this.totalByteSize); +} +getInputStream().seek(this.startOffset); +return; + } + + public int read() throws IOException { +return getInputStream().read(); + } + + public synchronized int read(DrillBuf buf, int off, int len) throws IOException { +buf.clear(); +ByteBuffer directBuffer = buf.nioBuffer(0, len); +int lengthLeftToRead = len; +while (lengthLeftToRead > 0) { + lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead); +} +buf.writerIndex(len); +return len; + } + + public synchronized DrillBuf getNext(int bytes) throws IOException { +DrillBuf b = allocator.buffer(bytes); +int bytesRead = read(b, 0, bytes); +if (bytesRead <= -1) { + b.release(); + return null; +} +return b; + } + + public long getPos() throws IOException { +return getInputStream().getPos(); + } + + public boolean hasRemainder() throws IOException { +return getInputStream().available() > 0; + } + + protected FSDataInputStream getInputStream() throws IOException { +// Make sure stream is open +checkInputStreamState(); +return (FSDataInputStream) in; + } + + protected void checkInputStreamState() throws IOException { +if (in == null) { + throw new IOException("Input stream is closed."); +} + } + + protected void checkStreamSupportsByteBuffer() throws UnsupportedOperationException { +// Check input stream supports ByteBuffer
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86446183 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. --- End diff -- That's a question that's been asked of Drill before. The Parquet reader at one point did read from the file system using byte arrays, but the overhead of copying from byte arrays to direct memory was (it turned out) expensive. It also increased the heap usage. Given that, we switched to reading directly into DirectByteBuffers which did improve performance somewhat, and definitely reduced the heap memory requirements. I couldn't think of any other way to read from the FS into direct memory without writing this class. Using BufferedStream would have taken us back to where we were before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86456058 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private final int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + } + + /** + * Creates a
[GitHub] drill pull request #637: Drill 1950 : Parquet row group filter pushdown.
Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/drill/pull/637#discussion_r86599725 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java --- @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.stat; + +import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.FunctionHolderExpression; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.fn.CastFunctions; +import org.apache.drill.common.expression.fn.FuncHolder; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder; +import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.expr.holders.Float4Holder; +import org.apache.drill.exec.expr.holders.Float8Holder; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.expr.holders.ValueHolder; +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics; +import org.apache.drill.exec.vector.ValueHolderHelper; +import org.apache.parquet.column.statistics.DoubleStatistics; +import org.apache.parquet.column.statistics.FloatStatistics; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.Statistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class RangeExprEvaluator extends AbstractExprVisitor{ + static final Logger logger = LoggerFactory.getLogger(RangeExprEvaluator.class); + + private final Map columnStatMap; + private final long rowCount; + + public RangeExprEvaluator(final Map columnStatMap, long rowCount) { +this.columnStatMap = columnStatMap; +this.rowCount = rowCount; + } + + public long getRowCount() { +return this.rowCount; + } + + @Override + public Statistics visitUnknown(LogicalExpression e, Void value) throws RuntimeException { +if (e instanceof TypedFieldExpr) { + TypedFieldExpr fieldExpr = (TypedFieldExpr) e; + final ColumnStatistics columnStatistics = columnStatMap.get(fieldExpr.getPath()); + if (columnStatistics != null) { +return columnStatistics.getStatistics(); + } else { +// field does not exist. + Preconditions.checkArgument(fieldExpr.getMajorType().equals(Types.OPTIONAL_INT)); +IntStatistics intStatistics = new IntStatistics(); +intStatistics.setNumNulls(rowCount); // all values are nulls +return intStatistics; + } +} +return null; + } + + @Override + public Statistics visitIntConstant(ValueExpressions.IntExpression expr, Void value) throws RuntimeException { +return getStatistics(expr.getInt()); + } + + @Override + public Statistics visitLongConstant(ValueExpressions.LongExpression expr, Void value) throws RuntimeException { +return getStatistics(expr.getLong()); + } + + @Override + public Statistics visitFloatConstant(ValueExpressions.FloatExpression expr, Void value) throws RuntimeException { +return
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86461349 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private final int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + } + + /** + * Creates a
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86444957 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -207,6 +207,10 @@ public OperatorContext getOperatorContext() { return operatorContext; } + public FragmentContext getFragmentContext() { --- End diff -- Parquet record reader already had the fragment context, I merely make it accessible by other classes in the Parquet Reader. But the point is taken. Ideally, there should be no need for this at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86446316 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB --- End diff -- Yes it is a constant. As are the others on the next couple of lines. Will update this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86460478 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java --- @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +public class DirectBufInputStream extends FilterInputStream { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class); + + protected boolean enableHints = true; + protected String streamId; // a name for logging purposes only + protected BufferAllocator allocator; + /** + * The length of the data we expect to read. The caller may, in fact, + * ask for more or less bytes. However this is useful for providing hints where + * the underlying InputStream supports hints (e.g. fadvise) + */ + protected final long totalByteSize; + + /** + * The offset in the underlying stream to start reading from + */ + protected final long startOffset; + + public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, + long totalByteSize, boolean enableHints) { +super(in); +Preconditions.checkArgument(startOffset >= 0); +Preconditions.checkArgument(totalByteSize >= 0); +this.streamId = id; +this.allocator = allocator; +this.startOffset = startOffset; +this.totalByteSize = totalByteSize; +this.enableHints = enableHints; + } + + public void init() throws IOException, UnsupportedOperationException { +checkStreamSupportsByteBuffer(); +if (enableHints) { + fadviseIfAvailable(getInputStream(), this.startOffset, this.totalByteSize); +} +getInputStream().seek(this.startOffset); +return; + } + + public int read() throws IOException { +return getInputStream().read(); + } + + public synchronized int read(DrillBuf buf, int off, int len) throws IOException { +buf.clear(); +ByteBuffer directBuffer = buf.nioBuffer(0, len); --- End diff -- No way that I know of. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86460309 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java --- @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.filereader; + +import com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.CompatibilityUtil; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * BufferedDirectBufInputStream reads from the + * underlying InputStream in blocks of data, into an + * internal buffer. The internal buffer is a direct memory backed + * buffer. The implementation is similar to the BufferedInputStream + * class except that the internal buffer is a Drillbuf and + * not a byte array. The mark and reset methods of the underlying + * InputStreamare not supported. + */ +public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); + + private static int defaultBufferSize = 8192 * 1024; // 8 MiB + private static int defaultTempBufferSize = 8192; // 8 KiB + + /** + * The internal buffer to keep data read from the underlying inputStream. + * internalBuffer[0] through internalBuffer[count-1] + * contains data read from the underlying input stream. + */ + protected volatile DrillBuf internalBuffer; // the internal buffer + + /** + * The number of valid bytes in internalBuffer. + * count is always in the range [0,internalBuffer.capacity] + * internalBuffer[count-1] is the last valid byte in the buffer. + */ + protected int count; + + /** + * The current read position in the buffer; the index of the next + * character to be read from the internalBuffer array. + * + * This value is always in the range [0,count]. + * If curPosInBuffer is equal to count> then we have read + * all the buffered data and the next read (or skip) will require more data to be read + * from the underlying input stream. + */ + protected int curPosInBuffer; + + protected long curPosInStream; // current offset in the input stream + + private final int bufSize; + + private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) + + + private DrillBuf getBuf() throws IOException { +checkInputStreamState(); +if (internalBuffer == null) { + throw new IOException("Input stream is closed."); +} +return this.internalBuffer; + } + + /** + * Creates a BufferedDirectBufInputStream + * with the default (8 MiB) buffer size. + */ + public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, + long startOffset, long totalByteSize, boolean enableHints) { +this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + } + + /** + * Creates a
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Some initial comments. The issue is regarding assigning fragments based on strict locality. So why is the parallelization logic affected, and not exclusively locality? Parallelization logic is affected because it decides how many fragments to run on each node and that is dependent on locality. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #639: DRILL-4706: Fragment planning causes Drillbits to r...
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/639#discussion_r86597707 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/LocalAffinityFragmentParallelizer.java --- @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.Map; +import java.util.List; +import java.util.Collection; +import java.util.HashMap; + + +/** + * Implementation of {@link FragmentParallelizer} where fragment has zero or more endpoints. + * This is for Parquet Scan Fragments only. Fragment placement is done preferring strict + * data locality. + */ +public class LocalAffinityFragmentParallelizer implements FragmentParallelizer { +public static final LocalAffinityFragmentParallelizer INSTANCE = new LocalAffinityFragmentParallelizer(); + +@Override +public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters, final Collection activeEndpoints) throws PhysicalOperatorSetupException { + +// Find the parallelization width of fragment +final Stats stats = fragmentWrapper.getStats(); +final ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo(); + +// 1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent +//with the calculation that ExcessiveExchangeRemover uses. +int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget()); + +// 2. Cap the parallelization width by fragment level width limit and system level per query width limit +width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth())); + +// 3. Cap the parallelization width by system level per node width limit +width = Math.min(width, parameters.getMaxWidthPerNode() * activeEndpoints.size()); + +// 4. Make sure width is at least the min width enforced by operators +width = Math.max(parallelizationInfo.getMinWidth(), width); + +// 5. Make sure width is at most the max width enforced by operators +width = Math.min(parallelizationInfo.getMaxWidth(), width); + +// 6: Finally make sure the width is at least one +width = Math.max(1, width); + +List endpointPool = Lists.newArrayList(); +List assignedEndPoints = Lists.newArrayList(); + +MapendpointAffinityMap = + fragmentWrapper.getStats().getParallelizationInfo().getEndpointAffinityMap(); + +int totalAssigned = 0; +int totalWorkUnits = 0; + +// Get the total number of work units and list of endPoints to schedule fragments on +for (Map.Entry epAff : endpointAffinityMap.entrySet()) { +if (epAff.getValue().getNumLocalWorkUnits() > 0) { +totalWorkUnits += epAff.getValue().getNumLocalWorkUnits(); +endpointPool.add(epAff.getKey()); +} +} + +// Keep track of number of fragments allocated to each endpoint. +Map endpointAssignments = new HashMap<>(); + +// Keep track of how many more to assign to each endpoint. +Map remainingEndpointAssignments = new HashMap<>(); + +// Calculate the target allocation for each endPoint based on work it has to do +// Assign one fragment (minimum) to all the endPoints
[GitHub] drill issue #639: DRILL-4706: Fragment planning causes Drillbits to read rem...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/639 Updated with all review comments taken care of. Added TestLocalAffinityFragmentParallelizer.java which has bunch of test cases with examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86588911 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); +if(doneReading) { + break; +} +lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; +doneReading = columnReader.checkVectorCapacityReached(); +if(doneReading) { + break; +} + } +} + +exitLengthDeterminingLoop = doneReading; + + // check that the next record will fit in the batch + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { break; } - for (VarLengthColumn columnReader : columns ) { + for (VarLengthColumn columnReader : columns) { columnReader.updateReadyToReadPosition(); columnReader.currDefLevel = -1; } recordsReadInCurrentPass++; totalVariableLengthData += lengthVarFieldsInCurrentRecord; } while (recordsReadInCurrentPass < recordsToReadInThisPass); +return recordsReadInCurrentPass; + } + + public boolean readPagesParallel() { + +boolean isDone = false; +ArrayListfutures = Lists.newArrayList(); +for
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86588762 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); +if(doneReading) { + break; +} +lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; +doneReading = columnReader.checkVectorCapacityReached(); +if(doneReading) { + break; +} + } +} + +exitLengthDeterminingLoop = doneReading; + + // check that the next record will fit in the batch + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { break; } - for (VarLengthColumn columnReader : columns ) { + for (VarLengthColumn columnReader : columns) { columnReader.updateReadyToReadPosition(); columnReader.currDefLevel = -1; } recordsReadInCurrentPass++; totalVariableLengthData += lengthVarFieldsInCurrentRecord; } while (recordsReadInCurrentPass < recordsToReadInThisPass); +return recordsReadInCurrentPass; + } + + public boolean readPagesParallel() { + +boolean isDone = false; +ArrayListfutures = Lists.newArrayList(); +for
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86586621 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java --- @@ -417,12 +420,37 @@ private void resetBatch() { } public void readAllFixedFields(long recordsToRead) throws IOException { - - for (ColumnReader crs : columnStatuses) { - crs.processPages(recordsToRead); + boolean useAsyncColReader = + fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val; + if(useAsyncColReader){ +readAllFixedFieldsParallel(recordsToRead) ; + } else { + readAllFixedFieldsiSerial(recordsToRead); ; --- End diff -- ...FieldsiSerial --> ..FieldsSerial (remove extra "i") --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86588306 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); +if(doneReading) { + break; +} +lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; +doneReading = columnReader.checkVectorCapacityReached(); --- End diff -- Same questions as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86586360 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java --- @@ -258,4 +295,74 @@ public static int readIntLittleEndian(DrillBuf in, int offset) { return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } + private class ColumnReaderProcessPagesTask implements Callable { + +private final ColumnReader parent = ColumnReader.this; +private final long recordsToReadInThisPass; + +public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){ + this.recordsToReadInThisPass = recordsToReadInThisPass; +} + +@Override public Long call() throws IOException{ + + String oldname = Thread.currentThread().getName(); --- End diff -- Surround with try/finally so that the original thread name is restored even if an exception occurs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86586473 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java --- @@ -258,4 +295,74 @@ public static int readIntLittleEndian(DrillBuf in, int offset) { return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } + private class ColumnReaderProcessPagesTask implements Callable { + +private final ColumnReader parent = ColumnReader.this; +private final long recordsToReadInThisPass; + +public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){ + this.recordsToReadInThisPass = recordsToReadInThisPass; +} + +@Override public Long call() throws IOException{ + + String oldname = Thread.currentThread().getName(); + Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString()); + + this.parent.processPages(recordsToReadInThisPass); + + Thread.currentThread().setName(oldname); + return recordsToReadInThisPass; +} + + } + + /* + private class ColumnReaderDetermineSizeTask implements Callable { + +private final ColumnReader parent = ColumnReader.this; +private final long recordsReadInCurrentPass; +private final Integer lengthVarFieldsInCurrentRecord; + +public ColumnReaderDetermineSizeTask(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord){ + this.recordsReadInCurrentPass = recordsReadInCurrentPass; + this.lengthVarFieldsInCurrentRecord = lengthVarFieldsInCurrentRecord; +} + +@Override public Boolean call() throws IOException{ + + String oldname = Thread.currentThread().getName(); + Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString()); + + boolean b = this.parent.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); + + Thread.currentThread().setName(oldname); + return b; +} + + } + */ + + private class ColumnReaderReadRecordsTask implements Callable { + +private final ColumnReader parent = ColumnReader.this; +private final int recordsToRead; + +public ColumnReaderReadRecordsTask(int recordsToRead){ + this.recordsToRead = recordsToRead; +} + +@Override public Integer call() throws IOException{ + + String oldname = Thread.currentThread().getName(); --- End diff -- Same comment as above about exception-safety on the thread name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86587743 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { --- End diff -- Can't this be made simpler? ``` if ( ! readPagesParallel( ) ) { break; } ``` As far as I can tell, if doneReading is false, we'll skip the body of the loop (via another assignment later). A bit hard to follow... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #611: Drill-4800: Improve parquet reader performance
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/611#discussion_r86588221 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java --- @@ -43,43 +51,153 @@ public VarLenBinaryReader(ParquetRecordReader parentReader, List firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; -int lengthVarFieldsInCurrentRecord; -long totalVariableLengthData = 0; -boolean exitLengthDeterminingLoop = false; + // write the first 0 offset for (VarLengthColumn columnReader : columns) { columnReader.reset(); } +//if(useAsyncTasks){ +// recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); +//} else { + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); +//} +if(useAsyncTasks){ + readRecordsParallel(recordsReadInCurrentPass); +}else{ + readRecordsSerial(recordsReadInCurrentPass); +} +return recordsReadInCurrentPass; + } + + + private long determineSizesSerial(long recordsToReadInThisPass) throws IOException { +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; do { - lengthVarFieldsInCurrentRecord = 0; for (VarLengthColumn columnReader : columns) { -if ( !exitLengthDeterminingLoop ) { - exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); +if (!exitLengthDeterminingLoop) { + exitLengthDeterminingLoop = + columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { break; } } // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData - + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { + if (exitLengthDeterminingLoop || + (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { +break; + } + for (VarLengthColumn columnReader : columns) { +columnReader.updateReadyToReadPosition(); +columnReader.currDefLevel = -1; + } + recordsReadInCurrentPass++; + totalVariableLengthData += lengthVarFieldsInCurrentRecord; +} while (recordsReadInCurrentPass < recordsToReadInThisPass); + +return recordsReadInCurrentPass; + } + + + public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { +boolean doneReading = false; +int lengthVarFieldsInCurrentRecord = 0; +boolean exitLengthDeterminingLoop = false; +long totalVariableLengthData = 0; +long recordsReadInCurrentPass = 0; + +do { +doneReading = readPagesParallel(); + +if (!doneReading) { + lengthVarFieldsInCurrentRecord = 0; + for (VarLengthColumn columnReader : columns) { +doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); --- End diff -- Can this just be: ``` if ( ! columnReader.processPageData((int) recordsReadInCurrentPass) ) { break; } ``` ? Some explanation of the algorithm would be greatly appreciated by the readers of this code... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill issue #637: Drill 1950 : Parquet row group filter pushdown.
Github user jinfengni commented on the issue: https://github.com/apache/drill/pull/637 @amansinha100 , @parthchandra , I pushed three commits, to address the review comments, and add one option. Please kindly take another look at the commits. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86575055 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java --- @@ -218,13 +218,18 @@ private void javaPropertiesToSystemOptions() { } /** - * If cluster version is the same as default, update option to current drillbit version. + * If cluster version is the same as default, updates option to current drillbit version. + * Since getOption and setOption are separate calls, raise condition might occur + * when several drillbits are registering at the same time. + * It is assumed that the first drillbit in cluster sets cluster version + * but when the raise condition occurs, it might be the second or the third etc. + * This behaviour does not impose significant impact, since the main goal is to detect mismatching versions. */ - private void checkAndUpdateClusterVersionOption(OptionManager optionManager, DrillbitEndpoint drillbitEndpoint) { -OptionValue versionOption = optionManager.getOption(ExecConstants.CLUSTER_VERSION); + private void checkAndUpdateClusterVersionOption(OptionManager systemOptions, DrillbitEndpoint drillbitEndpoint) { --- End diff -- How can this work? Consider this: 1. Start a new cluster. 2. A Drillbit registers. It sets the cluster version. 3. A second Drillbit registers. Does it reset the version? Or, does it accept that the first has already set the version? 4. Shut down the cluster. 5. Upgrade Drill to a new version. 6. Start the first Drillbit. 7. Does this Drillbit overwrite the cluster version? Or, accept the existing (old) version? Note that steps 3 and 7 are identical from ZK's perspective. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86574518 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java --- @@ -218,13 +218,18 @@ private void javaPropertiesToSystemOptions() { } /** - * If cluster version is the same as default, update option to current drillbit version. + * If cluster version is the same as default, updates option to current drillbit version. + * Since getOption and setOption are separate calls, raise condition might occur + * when several drillbits are registering at the same time. + * It is assumed that the first drillbit in cluster sets cluster version + * but when the raise condition occurs, it might be the second or the third etc. + * This behaviour does not impose significant impact, since the main goal is to detect mismatching versions. --- End diff -- mismatching --> mismatched --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r8657 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java --- @@ -218,13 +218,18 @@ private void javaPropertiesToSystemOptions() { } /** - * If cluster version is the same as default, update option to current drillbit version. + * If cluster version is the same as default, updates option to current drillbit version. + * Since getOption and setOption are separate calls, raise condition might occur + * when several drillbits are registering at the same time. + * It is assumed that the first drillbit in cluster sets cluster version + * but when the raise condition occurs, it might be the second or the third etc. --- End diff -- raise --> race --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86574396 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java --- @@ -218,13 +218,18 @@ private void javaPropertiesToSystemOptions() { } /** - * If cluster version is the same as default, update option to current drillbit version. + * If cluster version is the same as default, updates option to current drillbit version. + * Since getOption and setOption are separate calls, raise condition might occur --- End diff -- raise --> race --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86568915 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java --- @@ -55,44 +59,89 @@ public Viewable getStats() { @GET @Path("/stats.json") @Produces(MediaType.APPLICATION_JSON) - public List getStatsJSON() { -List stats = Lists.newLinkedList(); -stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size())); -int number = 0; -for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) { - String initialized = bit.isInitialized() ? " initialized" : " not initialized"; - stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized)); - ++number; + public Stats getStatsJSON() { +final String version = work.getContext().getOptionManager().getOption(ExecConstants.CLUSTER_VERSION).string_val; + +final Mapprops = Maps.newLinkedHashMap(); +props.put("Cluster Version", version); +props.put("Number of Drillbits", work.getContext().getBits().size()); +CoordinationProtos.DrillbitEndpoint currentEndpoint = work.getContext().getEndpoint(); +final String address = currentEndpoint.getAddress(); +props.put("Data Port Address", address + ":" + currentEndpoint.getDataPort()); +props.put("User Port Address", address + ":" + currentEndpoint.getUserPort()); +props.put("Control Port Address", address + ":" + currentEndpoint.getControlPort()); +props.put("Maximum Direct Memory", DrillConfig.getMaxDirectMemory()); + +return new Stats(props, collectDrillbits(version)); + } + + private Collection collectDrillbits(String version) { +Set drillbits = Sets.newTreeSet(); +for (CoordinationProtos.DrillbitEndpoint endpoint : work.getContext().getBits()) { + boolean versionMatch = version.equals(endpoint.getVersion()); + DrillbitInfo drillbit = new DrillbitInfo(endpoint.getAddress(), endpoint.isInitialized(), endpoint.getVersion(), versionMatch); + drillbits.add(drillbit); } -stats.add(new Stat("Data Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getDataPort())); -stats.add(new Stat("User Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getUserPort())); -stats.add(new Stat("Control Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getControlPort())); -stats.add(new Stat("Maximum Direct Memory", DrillConfig.getMaxDirectMemory())); - -return stats; +return drillbits; } @XmlRootElement - public class Stat { -private String name; -private Object value; + public static class Stats { --- End diff -- Would actually be helpful to include Javadoc to explain the use of this stuff. Is this used as the data model for the Freemarker-based web UI? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86572497 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + + +Drillbits in the cluster have different versions. + + <#break> + + + + + + General Info + + + +<#assign props = model.getProps()> +<#list props?keys as key> + +${key} --- End diff -- I see what we're doing and it is clever. However, it forces us to put the user-visible labels as keys. Since hash maps are unordered, it means that we can't control the order of display of the properties. Two solutions. 1. Design the layout: ``` Version ... ${props.version} ... ``` Or provide two levels of dynamics: ``` class DisplayPair { String key; String label; ... } List list = ...; list.add( "version", "Version" }; ... <#list pairs as pair> ${pair.label} ... <#list pairs as pair> ${model[pair.key]} ``` The first is simpler and is fine: it lets the UI decide which attributes to display, in what form. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86565189 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java --- @@ -361,4 +361,11 @@ String DYNAMIC_UDF_SUPPORT_ENABLED = "exec.udf.enable_dynamic_support"; BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED, true, true); + + /** + * Option representing Drill cluster version + */ + String CLUSTER_VERSION = "drill.exec.cluster.version"; --- End diff -- What is a cluster version? Is this a well-defined concept? If there is such a thing, isn't it the version of all Drillbits in the cluster? It would not be a config parameter, would it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86568636 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java --- @@ -55,44 +59,89 @@ public Viewable getStats() { @GET @Path("/stats.json") @Produces(MediaType.APPLICATION_JSON) - public List getStatsJSON() { -List stats = Lists.newLinkedList(); -stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size())); -int number = 0; -for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) { - String initialized = bit.isInitialized() ? " initialized" : " not initialized"; - stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized)); - ++number; + public Stats getStatsJSON() { +final String version = work.getContext().getOptionManager().getOption(ExecConstants.CLUSTER_VERSION).string_val; + +final Mapprops = Maps.newLinkedHashMap(); +props.put("Cluster Version", version); +props.put("Number of Drillbits", work.getContext().getBits().size()); +CoordinationProtos.DrillbitEndpoint currentEndpoint = work.getContext().getEndpoint(); +final String address = currentEndpoint.getAddress(); +props.put("Data Port Address", address + ":" + currentEndpoint.getDataPort()); +props.put("User Port Address", address + ":" + currentEndpoint.getUserPort()); +props.put("Control Port Address", address + ":" + currentEndpoint.getControlPort()); +props.put("Maximum Direct Memory", DrillConfig.getMaxDirectMemory()); + +return new Stats(props, collectDrillbits(version)); + } + + private Collection collectDrillbits(String version) { --- End diff -- Rename or document what "version" is. Presumably, this is the cluster version? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86573607 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + + +Drillbits in the cluster have different versions. + + <#break> + + + + + + General Info + + + +<#assign props = model.getProps()> +<#list props?keys as key> + +${key} +${props[key]} + + + + + + + +List of Drillbits + + + + <#assign i = 1> + <#list model.getDrillbits() as drillbit> + + Drillbit # ${i} + ${drillbit.getAddress()} ${drillbit.isInitialized()} + + + <#if (drillbit.getVersion())?has_content>${drillbit.getVersion()}<#else>Undefined --- End diff -- Nice trick: ${drillbit.version()!"Undefined"} The "!" says "if the preceding is defined and not null, use it, else use the thing that comes next." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86570460 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + + +Drillbits in the cluster have different versions. --- End diff -- Can we include more info? Something like: Cluster version is x.y.z, but Drillbits in the cluster have other versions: a.b.c, d.e.f and i.j.k. Drill does not support clusters containing a mix of Drillbit versions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86570176 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + --- End diff -- This seems to emit a warning about version mismatch for every mismatched Drillbit. Seems it would be better to list the Drillbits, and look for mismatches. Then, if found, perhaps: Highlight the mismatched version (in red or whatever) or each Drillbit, then, after the list of bits, display the message that "One or more drill bits, highlighted above, have versions different from the cluster version of x.y.z." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86568133 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java --- @@ -55,44 +59,89 @@ public Viewable getStats() { @GET @Path("/stats.json") @Produces(MediaType.APPLICATION_JSON) - public List getStatsJSON() { -List stats = Lists.newLinkedList(); -stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size())); -int number = 0; -for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) { - String initialized = bit.isInitialized() ? " initialized" : " not initialized"; - stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized)); - ++number; + public Stats getStatsJSON() { --- End diff -- This message gets statistics, which is great. But, it seems to combine per-cluster stats with per-drillbit info. Perhaps either have two messages (one for cluster, the other for Drillbit) or use JSON structures: ``` clusterStats: { drillbitCount: .., version: ... }, drillbitStats: { userPort: ..., ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86566500 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java --- @@ -207,6 +210,21 @@ private void javaPropertiesToSystemOptions() { } /** + * If cluster version is the same as default, update option to current drillbit version. + */ + private void checkAndUpdateClusterVersionOption(OptionManager optionManager, DrillbitEndpoint drillbitEndpoint) { +OptionValue versionOption = optionManager.getOption(ExecConstants.CLUSTER_VERSION); +OptionValidator validator = SystemOptionManager.getValidator(ExecConstants.CLUSTER_VERSION); +if (versionOption.equals(validator.getDefault())) { + optionManager.setOption(OptionValue.createOption( --- End diff -- I'm not convinced that having the admin specify the cluster option is the right approach. The proper approach is for each Drillbit to detect that its own version conflicts with one or more of the others. But, for this to work, Drillbits have to advertise their version. Do we have that ability? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86573840 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + + +Drillbits in the cluster have different versions. + + <#break> + + + + + + General Info + + + +<#assign props = model.getProps()> +<#list props?keys as key> + +${key} +${props[key]} + + + + + + + +List of Drillbits + + --- End diff -- Better table would be to have columns for each field rather than putting all Drillbit data into a single column. (See note elsewhere.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86569289 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java --- @@ -55,44 +59,89 @@ public Viewable getStats() { @GET @Path("/stats.json") @Produces(MediaType.APPLICATION_JSON) - public List getStatsJSON() { -List stats = Lists.newLinkedList(); -stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size())); -int number = 0; -for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) { - String initialized = bit.isInitialized() ? " initialized" : " not initialized"; - stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized)); - ++number; + public Stats getStatsJSON() { +final String version = work.getContext().getOptionManager().getOption(ExecConstants.CLUSTER_VERSION).string_val; + +final Mapprops = Maps.newLinkedHashMap(); +props.put("Cluster Version", version); +props.put("Number of Drillbits", work.getContext().getBits().size()); +CoordinationProtos.DrillbitEndpoint currentEndpoint = work.getContext().getEndpoint(); +final String address = currentEndpoint.getAddress(); +props.put("Data Port Address", address + ":" + currentEndpoint.getDataPort()); +props.put("User Port Address", address + ":" + currentEndpoint.getUserPort()); +props.put("Control Port Address", address + ":" + currentEndpoint.getControlPort()); +props.put("Maximum Direct Memory", DrillConfig.getMaxDirectMemory()); + +return new Stats(props, collectDrillbits(version)); + } + + private Collection collectDrillbits(String version) { +Set drillbits = Sets.newTreeSet(); +for (CoordinationProtos.DrillbitEndpoint endpoint : work.getContext().getBits()) { + boolean versionMatch = version.equals(endpoint.getVersion()); + DrillbitInfo drillbit = new DrillbitInfo(endpoint.getAddress(), endpoint.isInitialized(), endpoint.getVersion(), versionMatch); + drillbits.add(drillbit); } -stats.add(new Stat("Data Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getDataPort())); -stats.add(new Stat("User Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getUserPort())); -stats.add(new Stat("Control Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getControlPort())); -stats.add(new Stat("Maximum Direct Memory", DrillConfig.getMaxDirectMemory())); - -return stats; +return drillbits; } @XmlRootElement - public class Stat { -private String name; -private Object value; + public static class Stats { +private final Map props; +private final Collection drillbits; @JsonCreator -public Stat(String name, Object value) { - this.name = name; - this.value = value; +public Stats(Map props, Collection drillbits) { + this.props = props; + this.drillbits = drillbits; +} + +public Map getProps() { + return props; } -public String getName() { - return name; +public Collection getDrillbits() { + return drillbits; } + } + + public static class DrillbitInfo implements Comparable { +private final String address; +private final boolean initialized; +private final String version; +private final boolean versionMatch; -public Object getValue() { - return value; +@JsonCreator +public DrillbitInfo(String address, boolean initialized, String version, boolean versionMatch) { + this.address = address; + this.initialized = initialized; + this.version = version; + this.versionMatch = versionMatch; } +public String getAddress() { + return address; +} + +public String isInitialized() { + return initialized ? "initialized" : "not initialized"; --- End diff -- boolean-to-string conversions are best handled in the web UI template so that they can possibly be localized later. And, all the user-visible text is in one place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86569572 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java --- @@ -55,44 +59,89 @@ public Viewable getStats() { @GET @Path("/stats.json") @Produces(MediaType.APPLICATION_JSON) - public List getStatsJSON() { -List stats = Lists.newLinkedList(); -stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size())); -int number = 0; -for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) { - String initialized = bit.isInitialized() ? " initialized" : " not initialized"; - stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized)); - ++number; + public Stats getStatsJSON() { +final String version = work.getContext().getOptionManager().getOption(ExecConstants.CLUSTER_VERSION).string_val; + +final Mapprops = Maps.newLinkedHashMap(); +props.put("Cluster Version", version); +props.put("Number of Drillbits", work.getContext().getBits().size()); +CoordinationProtos.DrillbitEndpoint currentEndpoint = work.getContext().getEndpoint(); +final String address = currentEndpoint.getAddress(); +props.put("Data Port Address", address + ":" + currentEndpoint.getDataPort()); +props.put("User Port Address", address + ":" + currentEndpoint.getUserPort()); +props.put("Control Port Address", address + ":" + currentEndpoint.getControlPort()); +props.put("Maximum Direct Memory", DrillConfig.getMaxDirectMemory()); + +return new Stats(props, collectDrillbits(version)); + } + + private Collection collectDrillbits(String version) { +Set drillbits = Sets.newTreeSet(); +for (CoordinationProtos.DrillbitEndpoint endpoint : work.getContext().getBits()) { + boolean versionMatch = version.equals(endpoint.getVersion()); + DrillbitInfo drillbit = new DrillbitInfo(endpoint.getAddress(), endpoint.isInitialized(), endpoint.getVersion(), versionMatch); + drillbits.add(drillbit); } -stats.add(new Stat("Data Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getDataPort())); -stats.add(new Stat("User Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getUserPort())); -stats.add(new Stat("Control Port Address", work.getContext().getEndpoint().getAddress() + - ":" + work.getContext().getEndpoint().getControlPort())); -stats.add(new Stat("Maximum Direct Memory", DrillConfig.getMaxDirectMemory())); - -return stats; +return drillbits; } @XmlRootElement - public class Stat { -private String name; -private Object value; + public static class Stats { +private final Map props; +private final Collection drillbits; @JsonCreator -public Stat(String name, Object value) { - this.name = name; - this.value = value; +public Stats(Map props, Collection drillbits) { + this.props = props; + this.drillbits = drillbits; +} + +public Map getProps() { + return props; } -public String getName() { - return name; +public Collection getDrillbits() { + return drillbits; } + } + + public static class DrillbitInfo implements Comparable { +private final String address; +private final boolean initialized; +private final String version; +private final boolean versionMatch; -public Object getValue() { - return value; +@JsonCreator +public DrillbitInfo(String address, boolean initialized, String version, boolean versionMatch) { + this.address = address; + this.initialized = initialized; + this.version = version; + this.versionMatch = versionMatch; } +public String getAddress() { + return address; +} + +public String isInitialized() { + return initialized ? "initialized" : "not initialized"; +} + +public String getVersion() { + return version; +} + +public boolean isVersionMatch() { + return versionMatch; +} + +@Override +public int compareTo(DrillbitInfo o) { --- End diff -- We store a version match, and yet we also provide a comparison. What version are we comparing here? What does it mean to have a Drillbit A that conflicts with the cluster but does or does not conflict with
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86567189 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java --- @@ -55,44 +59,89 @@ public Viewable getStats() { @GET @Path("/stats.json") @Produces(MediaType.APPLICATION_JSON) - public List getStatsJSON() { -List stats = Lists.newLinkedList(); -stats.add(new Stat("Number of Drill Bits", work.getContext().getBits().size())); -int number = 0; -for (CoordinationProtos.DrillbitEndpoint bit : work.getContext().getBits()) { - String initialized = bit.isInitialized() ? " initialized" : " not initialized"; - stats.add(new Stat("Bit #" + number, bit.getAddress() + initialized)); - ++number; + public Stats getStatsJSON() { +final String version = work.getContext().getOptionManager().getOption(ExecConstants.CLUSTER_VERSION).string_val; + +final Mapprops = Maps.newLinkedHashMap(); +props.put("Cluster Version", version); +props.put("Number of Drillbits", work.getContext().getBits().size()); --- End diff -- "Number of Drillbits" looks like a user-visible string. Not clear how these properties are used. In general, property keys should be internal. Then, if this is for the UI, the UI template should contain the user-visible label. We generally do not want user-visible strings to appear in the body of Java code. Seems this is for the REST API. So the properties appear as JSON properties? Best to use names that are valid identifiers: "clusterVersion", "drillbitCount", etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86565724 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java --- @@ -252,6 +252,8 @@ public DrillbitEndpoint apply(ServiceInstance input) { builder.append(bit.getControlPort()); builder.append(':'); builder.append(bit.getDataPort()); + builder.append(". Version - "); + builder.append(bit.getVersion()); --- End diff -- Now that we have many fields, would this be better displayed as a table with columns for each field? Host | User Port | Control Port | Data Port | Version | ... In this form we could also show the HTTP port and the port (forgot the name) whose value is computed rather than configured. Eventually, we can also show Drillbit status, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86573323 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + + +Drillbits in the cluster have different versions. + + <#break> + + + + + + General Info + + + +<#assign props = model.getProps()> +<#list props?keys as key> + +${key} +${props[key]} + + + + + + + +List of Drillbits + + + + <#assign i = 1> + <#list model.getDrillbits() as drillbit> + + Drillbit # ${i} + ${drillbit.getAddress()} ${drillbit.isInitialized()} --- End diff -- See note above about avoiding inline styles. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86566329 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java --- @@ -252,6 +252,8 @@ public DrillbitEndpoint apply(ServiceInstance input) { builder.append(bit.getControlPort()); builder.append(':'); builder.append(bit.getDataPort()); + builder.append(". Version - "); + builder.append(bit.getVersion()); --- End diff -- Actually, how is this even possible? Drill bits do not advertise their version in ZK, do they? How will we get the version of a remote Drillbit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #482: DRILL-4604: Generate warning on Web UI if drillbits...
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/482#discussion_r86573108 --- Diff: exec/java-exec/src/main/resources/rest/index.ftl --- @@ -17,17 +17,56 @@ back - - - -<#list model as stat> - -${stat.getName()} -${stat.getValue()} - - - - + + <#list model.getDrillbits() as drillbit> +<#if !drillbit.isVersionMatch()> + + +Drillbits in the cluster have different versions. + + <#break> + + + + + + General Info + + + +<#assign props = model.getProps()> +<#list props?keys as key> + +${key} +${props[key]} --- End diff -- Styles should be pulled out into the style sheet. Very hard to maintain inline styles. And, why do we need to use a different font? This is web UI, so it is not ensured that every browser has Courier. Instead, standard practice is to list multiple fonts, with a fallback to, say, monospace. See [this page](http://www.w3schools.com/cssref/css_websafe_fonts.asp). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] drill pull request #637: Drill 1950 : Parquet row group filter pushdown.
Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/drill/pull/637#discussion_r86572804 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java --- @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.stat; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.store.parquet.Metadata; +import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.DoubleStatistics; +import org.apache.parquet.column.statistics.FloatStatistics; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.joda.time.DateTimeConstants; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ParquetMetaStatCollector implements ColumnStatCollector{ + private final Metadata.ParquetTableMetadataBase parquetTableMetadata; + private final List columnMetadataList; + final MapimplicitColValues; + + public ParquetMetaStatCollector(Metadata.ParquetTableMetadataBase parquetTableMetadata, List columnMetadataList, Map implicitColValues) { +this.parquetTableMetadata = parquetTableMetadata; +this.columnMetadataList = columnMetadataList; +this.implicitColValues = implicitColValues; + } + + @Override + public Map collectColStat(Set fields) { +// map from column to ColumnMetadata +final Map columnMetadataMap = new HashMap<>(); + +// map from column name to column statistics. +final Map statMap = new HashMap<>(); + +for (final Metadata.ColumnMetadata columnMetadata : columnMetadataList) { + SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName()); + columnMetadataMap.put(schemaPath, columnMetadata); +} + +for (final SchemaPath schemaPath : fields) { + final PrimitiveType.PrimitiveTypeName primitiveType; + final OriginalType originalType; + + final Metadata.ColumnMetadata columnMetadata = columnMetadataMap.get(schemaPath); + + if (columnMetadata != null) { +final Object min = columnMetadata.getMinValue(); +final Object max = columnMetadata.getMaxValue(); +final Long numNull = columnMetadata.getNulls(); + +primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName()); +originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName()); +final Integer repetitionLevel = this.parquetTableMetadata.getRepetitionLevel(columnMetadata.getName()); + +statMap.put(schemaPath, getStat(min, max, numNull, primitiveType, originalType, repetitionLevel)); + } else { +final String columnName = schemaPath.getRootSegment().getPath(); +if (implicitColValues.containsKey(columnName)) { --- End diff -- Ok, I was thinking that you may not even pass in the implicit columns to the expression materializer and let the PruneScanRule rule handle the dirN column filters by pushing down into the scan.. but since your new rule is applied later (during physical planning phase) it implies that those were not removed. So now if we want to leverage them for row-group pruning, it makes sense to create the materialized expression with them. --- If your
[GitHub] drill pull request #637: Drill 1950 : Parquet row group filter pushdown.
Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/drill/pull/637#discussion_r86566263 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java --- @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.stat; + +import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.FunctionHolderExpression; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.fn.CastFunctions; +import org.apache.drill.common.expression.fn.FuncHolder; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder; +import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.expr.holders.Float4Holder; +import org.apache.drill.exec.expr.holders.Float8Holder; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.expr.holders.ValueHolder; +import org.apache.drill.exec.store.parquet.stat.ColumnStatistics; +import org.apache.drill.exec.vector.ValueHolderHelper; +import org.apache.parquet.column.statistics.DoubleStatistics; +import org.apache.parquet.column.statistics.FloatStatistics; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.Statistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class RangeExprEvaluator extends AbstractExprVisitor{ + static final Logger logger = LoggerFactory.getLogger(RangeExprEvaluator.class); + + private final Map columnStatMap; + private final long rowCount; + + public RangeExprEvaluator(final Map columnStatMap, long rowCount) { +this.columnStatMap = columnStatMap; +this.rowCount = rowCount; + } + + public long getRowCount() { +return this.rowCount; + } + + @Override + public Statistics visitUnknown(LogicalExpression e, Void value) throws RuntimeException { +if (e instanceof TypedFieldExpr) { + TypedFieldExpr fieldExpr = (TypedFieldExpr) e; + final ColumnStatistics columnStatistics = columnStatMap.get(fieldExpr.getPath()); + if (columnStatistics != null) { +return columnStatistics.getStatistics(); + } else { +// field does not exist. + Preconditions.checkArgument(fieldExpr.getMajorType().equals(Types.OPTIONAL_INT)); +IntStatistics intStatistics = new IntStatistics(); +intStatistics.setNumNulls(rowCount); // all values are nulls +return intStatistics; + } +} +return null; + } + + @Override + public Statistics visitIntConstant(ValueExpressions.IntExpression expr, Void value) throws RuntimeException { +return getStatistics(expr.getInt()); + } + + @Override + public Statistics visitLongConstant(ValueExpressions.LongExpression expr, Void value) throws RuntimeException { +return getStatistics(expr.getLong()); + } + + @Override + public Statistics visitFloatConstant(ValueExpressions.FloatExpression expr, Void value) throws RuntimeException { +return
[jira] [Created] (DRILL-5003) need a better error message
Khurram Faraaz created DRILL-5003: - Summary: need a better error message Key: DRILL-5003 URL: https://issues.apache.org/jira/browse/DRILL-5003 Project: Apache Drill Issue Type: Bug Components: Query Planning & Optimization Affects Versions: 1.9.0 Reporter: Khurram Faraaz Priority: Minor We need a better error message, Drill 1.9.0 returns an AssertionError. {noformat} 0: jdbc:drill:schema=dfs.tmp> SELECT * . . . . . . . . . . . . . . > FROM ( . . . . . . . . . . . . . . > select * from `t_alltype.parquet` t1, `t_alltype.parquet` t2 . . . . . . . . . . . . . . > WHERE EXISTS(t1.c4 = t2.c4) OR (t1.c4 IS NULL AND t2.c4 IS NULL) . . . . . . . . . . . . . . > ); Error: SYSTEM ERROR: AssertionError: Internal error: not a query: `t1`.`c4` = `t2`.`c4` [Error Id: 91e7af43-27a7-4577-802a-643705aeaf4f on centos-01.qa.lab:31010] (state=,code=0) {noformat} {noformat} 0: jdbc:drill:schema=dfs.tmp> SELECT * . . . . . . . . . . . . . . > FROM ( . . . . . . . . . . . . . . > select * from `t_alltype.parquet` t1, `t_alltype.parquet` t2 . . . . . . . . . . . . . . > WHERE EXISTS(t1.c4 = t2.c4 OR (t1.c4 IS NULL AND t2.c4 IS NULL)) . . . . . . . . . . . . . . > ); Error: SYSTEM ERROR: AssertionError: Internal error: not a query: `t1`.`c4` = `t2`.`c4` OR `t1`.`c4` IS NULL AND `t2`.`c4` IS NULL {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] drill issue #482: DRILL-4604: Generate warning on Web UI if drillbits versio...
Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/482 @sudheeshkatkam Could you please review changes after CR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (DRILL-5001) Join only supports implicit casts error even when I have explicit cast
Rahul Challapalli created DRILL-5001: Summary: Join only supports implicit casts error even when I have explicit cast Key: DRILL-5001 URL: https://issues.apache.org/jira/browse/DRILL-5001 Project: Apache Drill Issue Type: Bug Components: Query Planning & Optimization Reporter: Rahul Challapalli git.commit.id.abbrev=190d5d4 The below query fails even when I had an explicit cast on the right hand side of the join condition. The data also contains a metadata cache {code} select a.int_col, b.date_col from dfs. `/ drill / testdata / parquet_date / metadata_cache / mixed / fewtypes_null_large ` a inner join ( select * from dfs. `/ drill / testdata / parquet_date / metadata_cache / mixed / fewtypes_null_large ` where dir0 = '1.2' and date_col > '1996-03-07' ) b on a.date_col = cast(date_add(b.date_col, 5) as date) where a.int_col = 7 and a.dir0 = '1.9' group by a.int_col, b.date_col; Error: SYSTEM ERROR: DrillRuntimeException: Join only supports implicit casts between 1. Numeric data 2. Varchar, Varbinary data 3. Date, Timestamp data Left type: DATE, Right type: VARCHAR. Add explicit casts to avoid this error Fragment 2:0 [Error Id: a1b26420-af35-4892-9a87-d9b04e4423dc on qa-node190.qa.lab:31010] (state=,code=0) {code} I attached the data and the log file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)