[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153407376 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java --- @@ -314,13 +315,49 @@ public void exit() { lease = null; } +@Override +public void cancel() { + if (queueAcquirerThread != null) { +queueAcquirerThread.interrupt(); + } + foreman.moveToState(QueryState.CANCELED, null); +} + @Override public boolean hasQueue() { return true; } @Override public String queueName() { return lease == null ? null : lease.queueName(); } + +/** + * Is used to start query enqueue process in separate thread. + * Changes query state depending on the result. + */ +private class QueueAcquirer implements Runnable { + + private final QueryQueue queryQueue; + private final Foreman foreman; + private final double queryCost; + + QueueAcquirer(QueryQueue queryQueue, Foreman foreman, double queryCost) { +this.queryQueue = queryQueue; +this.foreman = foreman; +this.queryCost = queryCost; + } + + @Override + public void run() { +try { + queryQueue.enqueue(foreman.getQueryId(), queryCost); + foreman.moveToState(QueryState.STARTING, null); +} catch (Exception e) { --- End diff -- This has FAR to much visibility into the Foreman. See above. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153394006 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -426,48 +413,25 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); -admit(work); -queryManager.setQueueName(queryRM.queueName()); - -final List planFragments = work.getFragments(); -final PlanFragment rootPlanFragment = work.getRootFragment(); -assert queryId == rootPlanFragment.getHandle().getQueryId(); -drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); - drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); +fragmentsRunner.setPlanFragments(work.getFragments()); +fragmentsRunner.setRootPlanFragment(work.getRootFragment()); +fragmentsRunner.setRootOperator(work.getRootOperator()); --- End diff -- Actually, why not just accept the `work` object? I has all the items we need. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153406144 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153389793 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java --- @@ -112,4 +115,46 @@ public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException { fragments.add(defn.applyPlan(reader)); } } + + /** + * Converts list of stores fragments into json, + * in case of exception returns text indicating that json was malformed. + * Is used for debugging purposes. + * + * @return fragments json + */ + public String convertFragmentToJson() { +StringBuilder stringBuilder = new StringBuilder(); +final int fragmentCount = fragments.size(); +int fragmentIndex = 0; +for (final PlanFragment planFragment : fragments) { + final ExecProtos.FragmentHandle fragmentHandle = planFragment.getHandle(); + stringBuilder.append("PlanFragment("); --- End diff -- This isn't really JSON, is it? JSON syntax does not include parens. Do we want the output to be valid JSON? Or, a text wrapper around a JSON payload? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153403800 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153404245 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153406881 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java --- @@ -73,13 +75,20 @@ public void setCost(double cost) { @Override public void admit() { // No queueing by default + foreman.moveToState(UserBitShared.QueryResult.QueryState.STARTING, null); --- End diff -- Would REALLY prefer that the resource manager not depend on the Foreman; makes unit testing impossible. The Foreman should depend on this class, but not the other way around. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153403438 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -873,105 +791,133 @@ public void close() throws Exception { } } - private void moveToState(final QueryState newState, final Exception exception) { -logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, - exception); + public synchronized void moveToState(final QueryState newState, final Exception exception) { +logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, exception); switch (state) { -case ENQUEUED: - switch (newState) { - case FAILED: -Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); -recordNewState(newState); -foremanResult.setFailed(exception); -foremanResult.close(); -return; + case PLANNING: +switch (newState) { + case ENQUEUED: +recordNewState(newState); +enqueuedQueries.inc(); +return; + case CANCELLATION_REQUESTED: +assert exception == null; +recordNewState(newState); +foremanResult.setCompleted(QueryState.CANCELED); +foremanResult.close(); +return; + case FAILED: +assert exception != null; +recordNewState(newState); +foremanResult.setFailed(exception); +foremanResult.close(); +return; +} +break; + case ENQUEUED: +enqueuedQueries.dec(); +queryManager.markQueueWaitEndTime(); +switch (newState) { --- End diff -- This is exactly the kind of double-switch statement that is hard to understand. Better, a method for each transition: `enqueue()`, `start()`, etc. Each transition is then easier to reason about. What are the valid from states? What are the accounting operations needed in each transition? Even better would be to combine the logic for the state transition (adding the query to the queue, say) along with the state transition logic, since often the new state is the result not simply of a transition, but of some condition (such as, when queueing, moving to either ENQUEUED, RUNNING or FAILED depending on the queue result.) Otherwise the logic has to be essentially duplicated. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153405107 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153405899 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153405303 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153402805 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -426,48 +413,25 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); -admit(work); -queryManager.setQueueName(queryRM.queueName()); - -final List planFragments = work.getFragments(); -final PlanFragment rootPlanFragment = work.getRootFragment(); -assert queryId == rootPlanFragment.getHandle().getQueryId(); -drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); - drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); +fragmentsRunner.setPlanFragments(work.getFragments()); +fragmentsRunner.setRootPlanFragment(work.getRootFragment()); +fragmentsRunner.setRootOperator(work.getRootOperator()); -logger.debug("Submitting fragments to run."); - -// set up the root fragment first so we'll have incoming buffers available. -setupRootFragment(rootPlanFragment, work.getRootOperator()); - -setupNonRootFragments(planFragments); - -moveToState(QueryState.RUNNING, null); -logger.debug("Fragments running."); +admit(); } - private void admit(QueryWorkUnit work) throws ForemanSetupException { + private void admit() throws ForemanSetupException { queryManager.markPlanningEndTime(); -try { - queryRM.admit(); -} catch (QueueTimeoutException e) { - throw UserException - .resourceError() - .message(e.getMessage()) - .build(logger); -} catch (QueryQueueException e) { - throw new ForemanSetupException(e.getMessage(), e); -} finally { - queryManager.markQueueWaitEndTime(); -} -moveToState(QueryState.STARTING, null); +planningQueries.dec(); --- End diff -- Agree. A better solution is to have a separate function for each new state to avoid big hairy switch statements from current state to desired state. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153407221 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java --- @@ -38,22 +35,17 @@ * For some cases the foreman does not have a full plan, just a cost. In * this case, this object will not plan memory, but still needs the cost * to place the job into the correct queue. - * @param cost + * @param cost query cost */ void setCost(double cost); /** - * Admit the query into the cluster. Blocks until the query - * can run. (Later revisions may use a more thread-friendly - * approach.) - * @throws QueryQueueException if something goes wrong with the - * queue mechanism - * @throws QueueTimeoutException if the query timed out waiting to - * be admitted. + * Admit the query into the cluster. Starts enqueueing process in separate thread. --- End diff -- If we want to do this, please provide an interface with the needed services. Also, if we want to do this, then the implementation should change. Maybe admit returns a status: ADMITTED, QUEUED, FAILED. (Or, FAILED can be in the form of an exception.) The mechanism should be: pass in a handle to the query. If admit, tell the handle to run. If blocked, figure out how to receive the notification from ZK. Once the query is admitted, let it run. If a timeout occurs, tell the handle that the query failed. With a handle, we can unit test the mechanism. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153404840 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153389627 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java --- @@ -19,30 +19,35 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import java.util.HashMap; +import java.util.Map; + public class ProfileUtil { - // Display names for QueryState enum in UserBitShared.proto - private static final String[] queryStateDisplayNames = { -"Starting", // STARTING = 0 -"Running", // RUNNING = 1 -"Succeeded", // COMPLETED = 2 -"Canceled", // CANCELED = 3 -"Failed", // FAILED = 4 -"CancellationRequested", // CANCELLATION_REQUESTED = 5 -"Enqueued" // ENQUEUED = 6 - }; + private static final MapqueryStateDisplayMap = new HashMap<>(QueryState.values().length); + + static { +queryStateDisplayMap.put(QueryState.PREPARING, "Preparing"); +queryStateDisplayMap.put(QueryState.PLANNING, "Planning"); +queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued"); +queryStateDisplayMap.put(QueryState.STARTING, "Starting"); +queryStateDisplayMap.put(QueryState.RUNNING, "Running"); +queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded"); +queryStateDisplayMap.put(QueryState.FAILED, "Failed"); +queryStateDisplayMap.put(QueryState.CANCELLATION_REQUESTED, "Cancellation Requested"); +queryStateDisplayMap.put(QueryState.CANCELED, "Canceled"); + } /** - * Utility to return display name for query state - * @param queryState + * Utility method to return display name for query state + * @param queryState query state * @return display string for query state */ - public final static String getQueryStateDisplayName(QueryState queryState) { -int queryStateOrdinal = queryState.getNumber(); -if (queryStateOrdinal >= queryStateDisplayNames.length) { - return queryState.name(); -} else { - return queryStateDisplayNames[queryStateOrdinal]; + public static String getQueryStateDisplayName(QueryState queryState) { +String displayName = queryStateDisplayMap.get(queryState); +if (displayName == null) { --- End diff -- It isn't. But, this project added new states. Before this UI was updated, we would want to gracefully handle the intermediate condition. Same as the next time someone adds a state. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153403903 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153389550 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java --- @@ -19,30 +19,35 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import java.util.HashMap; +import java.util.Map; + public class ProfileUtil { - // Display names for QueryState enum in UserBitShared.proto - private static final String[] queryStateDisplayNames = { -"Starting", // STARTING = 0 -"Running", // RUNNING = 1 -"Succeeded", // COMPLETED = 2 -"Canceled", // CANCELED = 3 -"Failed", // FAILED = 4 -"CancellationRequested", // CANCELLATION_REQUESTED = 5 -"Enqueued" // ENQUEUED = 6 - }; + private static final MapqueryStateDisplayMap = new HashMap<>(QueryState.values().length); + + static { +queryStateDisplayMap.put(QueryState.PREPARING, "Preparing"); +queryStateDisplayMap.put(QueryState.PLANNING, "Planning"); +queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued"); +queryStateDisplayMap.put(QueryState.STARTING, "Starting"); +queryStateDisplayMap.put(QueryState.RUNNING, "Running"); +queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded"); --- End diff -- We cannot change the QueryState: it is part of the public Protobuf API. The reason for this table is that the UI wants to use a different display name. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153406564 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java --- @@ -280,29 +280,29 @@ public void interrupted(final InterruptedException ex) { } void updateEphemeralState(final QueryState queryState) { - // If query is already in zk transient store, ignore the transient state update option. - // Else, they will not be removed from transient store upon completion. - if (!inTransientStore && - !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) { -return; - } +// If query is already in zk transient store, ignore the transient state update option. +// Else, they will not be removed from transient store upon completion. +if (!inTransientStore && !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) { + return; +} - switch (queryState) { +switch (queryState) { + case PREPARING: + case PLANNING: case ENQUEUED: case STARTING: case RUNNING: case CANCELLATION_REQUESTED: runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. inTransientStore = true; break; - case COMPLETED: case CANCELED: case FAILED: try { runningProfileStore.remove(stringQueryId); inTransientStore = false; -} catch(final Exception e) { +} catch (final Exception e) { --- End diff -- Below. What is an "estore"? Is that a typo? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153405696 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.concurrent.ExtendedLatch; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.work.EndpointListener; +import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; +import org.apache.drill.exec.work.fragment.NonRootFragmentManager; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + * Is responsible for submitting query fragments for running (locally and remotely). + */ +public class FragmentsRunner { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class); + + private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000; + + private final WorkerBee bee; + private final UserClientConnection initiatingClient; + private final DrillbitContext drillbitContext; + private final Foreman foreman; + + private List planFragments; + private PlanFragment rootPlanFragment; + private FragmentRoot rootOperator; + + public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) { +this.bee = bee; +this.initiatingClient = initiatingClient; +this.drillbitContext = drillbitContext; +this.foreman = foreman; + } + + public WorkerBee getBee() { +return bee; + } + + public void setPlanFragments(List planFragments) { +this.planFragments = planFragments; + } + + public void setRootPlanFragment(PlanFragment rootPlanFragment) { +this.rootPlanFragment = rootPlanFragment; + } + + public void setRootOperator(FragmentRoot rootOperator) { +this.rootOperator = rootOperator; + } + + /** + * Submits root and non-root fragments fragments for running. + * In case of success move query to the running state. + */ + public void submit() { +try { + assert planFragments != null; + assert rootPlanFragment != null; + assert rootOperator != null; + + QueryId
Drill Hangout Nov 28 2017
We'll have the Drill Hangout tomorrow Nov 28th, at 10 AM PST. As usual, please send email if you have any topics to discuss or bring them up on the call. We'll start with the release plan :) Hangout link: https://plus.google.com/hangouts/_/event/ci4rdiju8bv04a64efj5fedd0lc Thanks Parth
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r153388800 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java --- @@ -30,18 +30,24 @@ public class WorkspaceConfig { /** Default workspace is a root directory which supports read, but not write. */ - public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null); + public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null, false); private final String location; private final boolean writable; private final String defaultInputFormat; - + private final Boolean allowAccessOutsideWorkspace; // allow access outside the workspace by default. This --- End diff -- Yes it would, I believe. But we want the value to be `true` for backward compatibility. (This also addresses your next comment). So we need to know if the value is missing. Can only do that with a non primitive AFAIK. ---
[GitHub] drill issue #1027: DRILL-4779 : Kafka storage plugin
Github user akumarb2010 commented on the issue: https://github.com/apache/drill/pull/1027 As this PR is corrupted, we have created a new PR with single commit https://github.com/apache/drill/pull/1052 ---
[GitHub] drill pull request #1052: DRILL-4779: Kafka storage plugin new PR(Kamesh Bha...
GitHub user akumarb2010 opened a pull request: https://github.com/apache/drill/pull/1052 DRILL-4779: Kafka storage plugin new PR(Kamesh Bhallamudi & Anil Kumar Batchu) As https://github.com/apache/drill/pull/1027 PR is corrupted while squash, creating a new PR with same codebase, but with single commit. This is tested against Kafka and verified unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akumarb2010os/drill master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1052.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1052 commit d8b4adab3215ff8d07c8183f42a5c6927785a264 Author: Anil Kumar BatchuDate: 2017-11-28T02:44:55Z DRILL-4779: Kafka storage plugin new PR (Kamesh Bhallamudi & Anil Kumar Batchu) ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r153336979 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java --- @@ -30,18 +30,24 @@ public class WorkspaceConfig { /** Default workspace is a root directory which supports read, but not write. */ - public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null); + public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null, false); private final String location; private final boolean writable; private final String defaultInputFormat; - + private final Boolean allowAccessOutsideWorkspace; // allow access outside the workspace by default. This --- End diff -- As far I remember `@JsonProperty("allowAccessOutsideWorkspace") boolean allowAccessOutsideWorkspace` will set false by default if value is not present during deserialization. Could you please check? ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r153337275 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java --- @@ -30,18 +30,25 @@ public class WorkspaceConfig { /** Default workspace is a root directory which supports read, but not write. */ - public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null); + public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null, false); private final String location; private final boolean writable; private final String defaultInputFormat; - + private final Boolean allowAccessOutsideWorkspace; // allow access outside the workspace by default. This + // field is a Boolean (not boolean) so that we can + // assign a default value if it is not defined in a + // storage plugin config public WorkspaceConfig(@JsonProperty("location") String location, @JsonProperty("writable") boolean writable, - @JsonProperty("defaultInputFormat") String defaultInputFormat) { + @JsonProperty("defaultInputFormat") String defaultInputFormat, + @JsonProperty("allowAccessOutsideWorkspace") Boolean allowAccessOutsideWorkspace + ) { this.location = location; this.writable = writable; this.defaultInputFormat = defaultInputFormat; +//this.allowAccessOutsideWorkspace = allowAccessOutsideWorkspace != null ? allowAccessOutsideWorkspace : false ; +this.allowAccessOutsideWorkspace = true; --- End diff -- It seems we should not always set true... ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153297425 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -873,105 +791,133 @@ public void close() throws Exception { } } - private void moveToState(final QueryState newState, final Exception exception) { -logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, - exception); + public synchronized void moveToState(final QueryState newState, final Exception exception) { +logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, exception); switch (state) { -case ENQUEUED: - switch (newState) { - case FAILED: -Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); -recordNewState(newState); -foremanResult.setFailed(exception); -foremanResult.close(); -return; + case PLANNING: +switch (newState) { + case ENQUEUED: +recordNewState(newState); +enqueuedQueries.inc(); --- End diff -- Decrement plannedQueries ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153278034 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -221,11 +200,15 @@ public QueryManager getQueryManager() { } /** - * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be - * terminated. + * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated. + * For planning and enqueued states we cancel immediately since these states are done locally. + * + * Note this can be called from outside of run() on another thread, or after run() completes */ public void cancel() { -// Note this can be called from outside of run() on another thread, or after run() completes +if (QueryState.PLANNING == state || QueryState.ENQUEUED == state) { + moveToState(QueryState.CANCELLATION_REQUESTED, null); --- End diff -- Please add a comment explaining why you are moving to CANCELLATION_REQUESTED state only when query is in PLANNING and ENQUEUED states. For ex., why not PREPARING State ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153285121 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -256,12 +239,14 @@ public void run() { // track how long the query takes queryManager.markStartTime(); -enqueuedQueries.dec(); runningQueries.inc(); +queryText = queryRequest.getPlan(); +recordNewState(QueryState.PLANNING); --- End diff -- why this code is moved out of the try block ? I believe getPlan can throw an exception which we should handle. Also, should this be handled under moveToState ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153293973 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -177,9 +155,10 @@ public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), drillbitContext.getClusterCoordinator(), this); -recordNewState(QueryState.ENQUEUED); -enqueuedQueries.inc(); +recordNewState(QueryState.PREPARING); --- End diff -- Better to handle all state transitions under moveToState. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153268878 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java --- @@ -19,30 +19,35 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import java.util.HashMap; +import java.util.Map; + public class ProfileUtil { - // Display names for QueryState enum in UserBitShared.proto - private static final String[] queryStateDisplayNames = { -"Starting", // STARTING = 0 -"Running", // RUNNING = 1 -"Succeeded", // COMPLETED = 2 -"Canceled", // CANCELED = 3 -"Failed", // FAILED = 4 -"CancellationRequested", // CANCELLATION_REQUESTED = 5 -"Enqueued" // ENQUEUED = 6 - }; + private static final MapqueryStateDisplayMap = new HashMap<>(QueryState.values().length); + + static { +queryStateDisplayMap.put(QueryState.PREPARING, "Preparing"); +queryStateDisplayMap.put(QueryState.PLANNING, "Planning"); +queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued"); +queryStateDisplayMap.put(QueryState.STARTING, "Starting"); +queryStateDisplayMap.put(QueryState.RUNNING, "Running"); +queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded"); +queryStateDisplayMap.put(QueryState.FAILED, "Failed"); +queryStateDisplayMap.put(QueryState.CANCELLATION_REQUESTED, "Cancellation Requested"); +queryStateDisplayMap.put(QueryState.CANCELED, "Canceled"); + } /** - * Utility to return display name for query state - * @param queryState + * Utility method to return display name for query state + * @param queryState query state * @return display string for query state */ - public final static String getQueryStateDisplayName(QueryState queryState) { -int queryStateOrdinal = queryState.getNumber(); -if (queryStateOrdinal >= queryStateDisplayNames.length) { - return queryState.name(); -} else { - return queryStateDisplayNames[queryStateOrdinal]; + public static String getQueryStateDisplayName(QueryState queryState) { +String displayName = queryStateDisplayMap.get(queryState); +if (displayName == null) { --- End diff -- when is it possible for a query to have state that is not in the map ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153323467 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java --- @@ -314,13 +315,49 @@ public void exit() { lease = null; } +@Override +public void cancel() { + if (queueAcquirerThread != null) { +queueAcquirerThread.interrupt(); + } + foreman.moveToState(QueryState.CANCELED, null); +} + @Override public boolean hasQueue() { return true; } @Override public String queueName() { return lease == null ? null : lease.queueName(); } + +/** + * Is used to start query enqueue process in separate thread. + * Changes query state depending on the result. + */ +private class QueueAcquirer implements Runnable { + + private final QueryQueue queryQueue; + private final Foreman foreman; + private final double queryCost; + + QueueAcquirer(QueryQueue queryQueue, Foreman foreman, double queryCost) { +this.queryQueue = queryQueue; +this.foreman = foreman; +this.queryCost = queryCost; + } + + @Override + public void run() { +try { + queryQueue.enqueue(foreman.getQueryId(), queryCost); + foreman.moveToState(QueryState.STARTING, null); +} catch (Exception e) { --- End diff -- Should we handle interruptedException (for cancel above) here ? I don't think we want to move Query to FAILED state by default. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153294703 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java --- @@ -19,30 +19,35 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import java.util.HashMap; +import java.util.Map; + public class ProfileUtil { - // Display names for QueryState enum in UserBitShared.proto - private static final String[] queryStateDisplayNames = { -"Starting", // STARTING = 0 -"Running", // RUNNING = 1 -"Succeeded", // COMPLETED = 2 -"Canceled", // CANCELED = 3 -"Failed", // FAILED = 4 -"CancellationRequested", // CANCELLATION_REQUESTED = 5 -"Enqueued" // ENQUEUED = 6 - }; + private static final MapqueryStateDisplayMap = new HashMap<>(QueryState.values().length); + + static { +queryStateDisplayMap.put(QueryState.PREPARING, "Preparing"); +queryStateDisplayMap.put(QueryState.PLANNING, "Planning"); +queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued"); +queryStateDisplayMap.put(QueryState.STARTING, "Starting"); +queryStateDisplayMap.put(QueryState.RUNNING, "Running"); +queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded"); --- End diff -- To be consistent, should we name this state QueryState.SUCCEEDED ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153296504 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -873,105 +791,133 @@ public void close() throws Exception { } } - private void moveToState(final QueryState newState, final Exception exception) { -logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, - exception); + public synchronized void moveToState(final QueryState newState, final Exception exception) { +logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, exception); switch (state) { -case ENQUEUED: - switch (newState) { - case FAILED: -Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); -recordNewState(newState); -foremanResult.setFailed(exception); -foremanResult.close(); -return; + case PLANNING: --- End diff -- can you add the case for PREPARING as well ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153290872 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -426,48 +413,25 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep queryManager.setTotalCost(plan.totalCost()); work.applyPlan(drillbitContext.getPlanReader()); logWorkUnit(work); -admit(work); -queryManager.setQueueName(queryRM.queueName()); - -final List planFragments = work.getFragments(); -final PlanFragment rootPlanFragment = work.getRootFragment(); -assert queryId == rootPlanFragment.getHandle().getQueryId(); -drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); - drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); +fragmentsRunner.setPlanFragments(work.getFragments()); +fragmentsRunner.setRootPlanFragment(work.getRootFragment()); +fragmentsRunner.setRootOperator(work.getRootOperator()); -logger.debug("Submitting fragments to run."); - -// set up the root fragment first so we'll have incoming buffers available. -setupRootFragment(rootPlanFragment, work.getRootOperator()); - -setupNonRootFragments(planFragments); - -moveToState(QueryState.RUNNING, null); -logger.debug("Fragments running."); +admit(); } - private void admit(QueryWorkUnit work) throws ForemanSetupException { + private void admit() throws ForemanSetupException { queryManager.markPlanningEndTime(); -try { - queryRM.admit(); -} catch (QueueTimeoutException e) { - throw UserException - .resourceError() - .message(e.getMessage()) - .build(logger); -} catch (QueryQueueException e) { - throw new ForemanSetupException(e.getMessage(), e); -} finally { - queryManager.markQueueWaitEndTime(); -} -moveToState(QueryState.STARTING, null); +planningQueries.dec(); --- End diff -- Better to handle increment and decrement of counters in moveToState. ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r15422 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java --- @@ -288,8 +287,10 @@ public void setCost(double cost) { } @Override -public void admit() throws QueueTimeoutException, QueryQueueException { - lease = rm.queue().enqueue(foreman.getQueryId(), queryCost); +public void admit() { + QueueAcquirer queueAcquirer = new QueueAcquirer(rm.queue(), foreman, queryCost); + queueAcquirerThread = new Thread(queueAcquirer); --- End diff -- General comment about the approach. We create minimum 2 threads for any query. With this, when queuing is enabled, we are creating minimum three threads. Instead, when a query is cancelled, is it possible to interrupt the foreman thread itself if that is blocked ? ---
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1051#discussion_r153297806 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java --- @@ -873,105 +791,133 @@ public void close() throws Exception { } } - private void moveToState(final QueryState newState, final Exception exception) { -logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, - exception); + public synchronized void moveToState(final QueryState newState, final Exception exception) { +logger.debug(queryIdString + ": State change requested {} --> {}", state, newState, exception); switch (state) { -case ENQUEUED: - switch (newState) { - case FAILED: -Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed"); -recordNewState(newState); -foremanResult.setFailed(exception); -foremanResult.close(); -return; + case PLANNING: +switch (newState) { + case ENQUEUED: +recordNewState(newState); +enqueuedQueries.inc(); +return; + case CANCELLATION_REQUESTED: +assert exception == null; +recordNewState(newState); +foremanResult.setCompleted(QueryState.CANCELED); +foremanResult.close(); +return; + case FAILED: +assert exception != null; +recordNewState(newState); +foremanResult.setFailed(exception); +foremanResult.close(); +return; --- End diff -- Here and below, should we have a default case and throw exception i.e. catch invalid state transitions ? ---
[GitHub] drill issue #921: DRILL-4286 Graceful shutdown of drillbit
Github user paul-rogers commented on the issue: https://github.com/apache/drill/pull/921 Interesting issue! Let's think about this a bit. In a production system, we do want the grace period; it is an essential part of the graceful shutdown procedure. However, if we are doing a non-graceful shutdown, the grace is unneeded. Also, if the cluster contains only one node (as in most unit tests), there is nothing to wait for, so the grace period is not needed. The same is true in an embedded Drillbit for Sqlline. So, can we provide a solution that handles these cases rather than simply turning off the grace period always? If using the local cluster coordinator, say, then no grace is needed. If using ZK, but there is only one Drillbit, no grace is needed. (There is a race condition, but may be OK.) Or, if we detect we are embedded, no grace period. Then, also, if we are doing a graceful shutdown, we need the grace. But, if we are doing a "classic" shutdown, no grace is needed. The result should be that the grace period is used only in production servers, only when doing a graceful shutdown. There are probably some details to simplify, but I hope the above communicates the idea. Can we make this work? ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r152862636 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java --- @@ -252,11 +252,15 @@ private static String buildPath(final String[] path, final int folderIndex) { return builder.toString(); } - public static FileSelection create(final DrillFileSystem fs, final String parent, final String path) throws IOException { + public static FileSelection create(final DrillFileSystem fs, final String parent, final String path, + final boolean allowAccessOutsideWorkspace) throws IOException { Stopwatch timer = Stopwatch.createStarted(); boolean hasWildcard = path.contains(WILD_CARD); final Path combined = new Path(parent, removeLeadingSlash(path)); +if (!allowAccessOutsideWorkspace) { + checkBackPaths(parent, combined.toString(), path); --- End diff -- Done ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r152862954 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java --- @@ -30,18 +30,24 @@ public class WorkspaceConfig { /** Default workspace is a root directory which supports read, but not write. */ - public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null); + public static final WorkspaceConfig DEFAULT = new WorkspaceConfig("/", false, null, false); private final String location; private final boolean writable; private final String defaultInputFormat; - + private final Boolean allowAccessOutsideWorkspace; // allow access outside the workspace by default. This --- End diff -- I need to check if the value is not present (i.e. null). That will be the case with all storage plugin configurations that have already been created. ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r152862722 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java --- @@ -359,15 +363,30 @@ private static Path handleWildCard(final String root) { } } - private static String removeLeadingSlash(String path) { -if (path.charAt(0) == '/') { + public static String removeLeadingSlash(String path) { +if (!path.isEmpty() && path.charAt(0) == '/') { String newPath = path.substring(1); return removeLeadingSlash(newPath); } else { return path; } } + // Check if the path is a valid sub path under the parent after removing backpaths. Throw an exception if + // it is not + // We pass subpath in as a parameter only for the error message + public static boolean checkBackPaths(String parent, String combinedPath, String subpath) { +Preconditions.checkArgument(!parent.isEmpty()); +Preconditions.checkArgument(!combinedPath.isEmpty()); --- End diff -- Done ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r153318818 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java --- @@ -59,4 +61,53 @@ public void testEmptyFolderThrowsTableNotFound() throws Exception { throw ex; } } + + @Test + public void testBackPath() throws Exception { --- End diff -- Done ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r152862665 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java --- @@ -359,15 +363,30 @@ private static Path handleWildCard(final String root) { } } - private static String removeLeadingSlash(String path) { -if (path.charAt(0) == '/') { + public static String removeLeadingSlash(String path) { +if (!path.isEmpty() && path.charAt(0) == '/') { String newPath = path.substring(1); return removeLeadingSlash(newPath); } else { return path; } } + // Check if the path is a valid sub path under the parent after removing backpaths. Throw an exception if --- End diff -- Done ---
[GitHub] drill pull request #1050: DRILL-5964: Do not allow queries to access paths o...
Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1050#discussion_r152862693 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java --- @@ -359,15 +363,30 @@ private static Path handleWildCard(final String root) { } } - private static String removeLeadingSlash(String path) { -if (path.charAt(0) == '/') { + public static String removeLeadingSlash(String path) { +if (!path.isEmpty() && path.charAt(0) == '/') { String newPath = path.substring(1); return removeLeadingSlash(newPath); } else { return path; } } + // Check if the path is a valid sub path under the parent after removing backpaths. Throw an exception if + // it is not + // We pass subpath in as a parameter only for the error message + public static boolean checkBackPaths(String parent, String combinedPath, String subpath) { --- End diff -- Done ---
[GitHub] drill issue #921: DRILL-4286 Graceful shutdown of drillbit
Github user dvjyothsna commented on the issue: https://github.com/apache/drill/pull/921 This was due to grace_period during shutdown of drillbits. Fixed it by changing grace_period = 0 in .conf ---
[jira] [Created] (DRILL-5993) Allow Copier to Copy a Record to an Arbitrary Index
Timothy Farkas created DRILL-5993: - Summary: Allow Copier to Copy a Record to an Arbitrary Index Key: DRILL-5993 URL: https://issues.apache.org/jira/browse/DRILL-5993 Project: Apache Drill Issue Type: New Feature Reporter: Timothy Farkas Assignee: Timothy Farkas Currently the copier can only copy record from an incoming batch to the beginning of an outgoing batch. We need to be able to copy a record to any index in the outgoing batch. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] drill issue #1047: DRILL-5970: DrillParquetReader always builds the schema w...
Github user priteshm commented on the issue: https://github.com/apache/drill/pull/1047 @sachouche can you please review this? ---
Re: [DISCUSS] Drill 1.12.0 release
Current status: DRILL-4779: Kafka storage plugin support (developer - Anil & Kamesh, code reviewer - Paul) - fix is expected by the EOD in new pull request. DRILL-4286: Graceful shutdown of drillbit (developer - Jyothsna, code reviewer - Paul) - unit test failures are fixed. Unit test performance degraded x3 times! Kind regards On Sun, Nov 26, 2017 at 6:15 PM, Arina Yelchiyeva < arina.yelchiy...@gmail.com> wrote: > Current status: > > DRILL-4779: Kafka storage plugin support (developer - Anil & Kamesh, code > reviewer - Paul) - could not cherry-pick the commits. Needs fix. > DRILL-4286: Graceful shutdown of drillbit (developer - Jyothsna, code > reviewer - Paul) - there are unit test failures. Needs fix. > > Kind regards > > On Sat, Nov 25, 2017 at 11:53 PM, AnilKumar B> wrote: > >> Hi Arina, >> >> Sorry for the delay. Just now we squashed Kafka storage plugin commits >> into >> one commit and pushed. >> >> Thanks & Regards, >> B Anil Kumar. >> >> On Sat, Nov 25, 2017 at 5:56 AM, Arina Yelchiyeva < >> arina.yelchiy...@gmail.com> wrote: >> >> > Current status: >> > >> > DRILL-4779: Kafka storage plugin support (developer - Anil & Kamesh, >> code >> > reviewer - Paul) - needs to squash the commits. >> > DRILL-4286: Graceful shutdown of drillbit (developer - Jyothsna, code >> > reviewer - Paul) - needs to address some code review comments. >> > >> > Kind regards >> > Arina >> > >> > On Wed, Nov 15, 2017 at 2:38 PM, Arina Yelchiyeva < >> > arina.yelchiy...@gmail.com> wrote: >> > >> > > Current status, we are close to the code freeze which will happen not >> > > later then the end of the next week. >> > > >> > > Blocker: >> > > DRILL-5917: Ban org.json:json library in Drill (developer - Vlad R., >> code >> > > reviewer - Arina) - in progress. >> > > >> > > Targeted for 1.12 release: >> > > DRILL-4779: Kafka storage plugin support (developer - Anil & Kamesh, >> code >> > > reviewer - Paul) - needs next round of code review. >> > > DRILL-5943: Avoid the strong check introduced by DRILL-5582 for PLAIN >> > > mechanism (developer - Sorabh, code reviewer - Parth & Laurent) - >> waiting >> > > for Parth code review. >> > > DRILL-5771: Fix serDe errors for format plugins (developer - Arina, >> code >> > > reviewer - Tim) - code review is done, waiting for the merge. >> > > >> > > Kind regards >> > > >> > > On Fri, Nov 10, 2017 at 9:32 AM, Chunhui Shi wrote: >> > > >> > >> Hi Arina, >> > >> >> > >> >> > >> Could we consider to include DRILL-5089 in 1.12.0? It is about lazy >> > >> loading schema for storage plugins. Could you or Paul take a look at >> the >> > >> pull request for this JIRA https://github.com/apache/drill/pull/1032? >> I >> > >> think both of you are familiar with this part. >> > >> >> > >> >> > >> Thanks, >> > >> >> > >> >> > >> Chunhui >> > >> >> > >> >> > >> From: Arina Yelchiyeva >> > >> Sent: Thursday, November 9, 2017 8:11:35 AM >> > >> To: dev@drill.apache.org >> > >> Subject: Re: [DISCUSS] Drill 1.12.0 release >> > >> >> > >> Yes, they are already in master. >> > >> >> > >> On Thu, Nov 9, 2017 at 6:05 PM, Charles Givre >> wrote: >> > >> >> > >> > We’re including the Networking functions in this release right? >> > >> > >> > >> > > On Nov 9, 2017, at 11:04, Arina Yelchiyeva < >> > >> arina.yelchiy...@gmail.com> >> > >> > wrote: >> > >> > > >> > >> > > If changes will be done before cut off date, targeting mid >> November >> > >> that >> > >> > it >> > >> > > will be possible to include this Jira. >> > >> > > >> > >> > > On Thu, Nov 9, 2017 at 6:03 PM, Charles Givre >> > >> wrote: >> > >> > > >> > >> > >> Hi Arina, >> > >> > >> Can we include DRILL-4091 Support for additional GIS operations >> in >> > >> > version >> > >> > >> 1.12? In general the code looked pretty good. There was a unit >> > test >> > >> > >> missing which the developer submitted and some minor formatting >> > >> issues >> > >> > >> which I’m still waiting on. >> > >> > >> Thanks, >> > >> > >> —C >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > >>> On Nov 9, 2017, at 10:58, Arina Yelchiyeva < >> > >> arina.yelchiy...@gmail.com >> > >> > > >> > >> > >> wrote: >> > >> > >>> >> > >> > >>> Current status: >> > >> > >>> >> > >> > >>> Blocker: >> > >> > >>> DRILL-5917: Ban org.json:json library in Drill (developer - >> Vlad >> > R., >> > >> > code >> > >> > >>> reviewer - ?) - in progress. >> > >> > >>> >> > >> > >>> Targeted for 1.12 release: >> > >> > >>> DRILL-5337: OpenTSDB plugin (developer - Dmitriy & Vlad S., >> code >> > >> > >> reviewer - >> > >> > >>> Arina) - code review in final stage. >> > >> > >>> DRILL-4779: Kafka storage plugin support (developer - Anil & >> > Kamesh, >> > >> > code >> > >> > >>> reviewer - Paul) - in review. >> > >> > >>> DRILL-5943: Avoid the strong check introduced by DRILL-5582 for >> > >> PLAIN >> > >> > >>> mechanism (developer - Sorabh, code reviewer
[GitHub] drill pull request #1051: DRILL-5963: Query state process improvements
GitHub user arina-ielchiieva opened a pull request: https://github.com/apache/drill/pull/1051 DRILL-5963: Query state process improvements 1. Added two new query states: PREPARING (when foreman is initialized) and PLANNING (includes logical and / or physical planning). 2. Process of query enqueuing was made non-blocking. Once query was enqueued, fragments runner is called to submit fragments locally and remotely. 3. Ability to cancel query during planning and enqueued states was added. 4. Logic for submitting fragments was moved from Foreman to new class FragmentsRunner. 5. Major type in DrillFuncHolderExpr was cached for better performance. Other details in [DRILL-5963](https://issues.apache.org/jira/browse/DRILL-5963) You can merge this pull request into a Git repository by running: $ git pull https://github.com/arina-ielchiieva/drill DRILL-5963 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1051.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1051 commit 1df913951ae9cadf3813836c4a03e66645d68965 Author: Arina IelchiievaDate: 2017-11-24T15:59:42Z DRILL-5963: Query state process improvements 1. Added two new query states: PREPARING (when foreman is initialized) and PLANNING (includes logical and / or physical planning). 2. Process of query enqueuing was made non-blocking. Once query was enqueued, fragments runner is called to submit fragments locally and remotely. 3. Ability to cancel query during planning and enqueued states was added. 4. Logic for submitting fragments was moved from Foreman to new class FragmentsRunner. 5. Major type in DrillFuncHolderExpr was cached for better performance. ---
[jira] [Created] (DRILL-5992) Move html code generation for query profile from Java into freemarker
Arina Ielchiieva created DRILL-5992: --- Summary: Move html code generation for query profile from Java into freemarker Key: DRILL-5992 URL: https://issues.apache.org/jira/browse/DRILL-5992 Project: Apache Drill Issue Type: Improvement Affects Versions: 1.11.0 Reporter: Arina Ielchiieva Fix For: Future Drill has lots of html generation that we have in Java for query profile (https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile). To adhere with MVC pattern we should consider moving all that code into freemarker + js if needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] drill issue #921: DRILL-4286 Graceful shutdown of drillbit
Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/921 @dvjyothsna all tests has passed. Though there is significant performance degradation. Before your changes unit test run takes ~ 33 minutes, after ~ 1 hour 18 minutes. For example, exec package: before `[INFO] exec/Java Execution Engine . SUCCESS [17:14 min]`, after `[INFO] exec/Java Execution Engine . SUCCESS [56:45 min]`. Since you did not add that many unit tests, could you please explain the reason of such performance degradation and ideally fix it. Attached two unit tests outputs in Jira. ---