DRILL-6125: Fix possible memory leak when query is cancelled or finished. close apache/drill#1105
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a264e7fe Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a264e7fe Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a264e7fe Branch: refs/heads/master Commit: a264e7feb1d02ffd5762bb1f652ea22d17aa5243 Parents: 03b245e Author: Timothy Farkas <timothyfar...@apache.org> Authored: Tue Jan 30 15:55:41 2018 -0800 Committer: Aman Sinha <asi...@maprtech.com> Committed: Thu Mar 29 23:24:09 2018 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/RootExec.java | 23 ++- .../PartitionSenderRootExec.java | 32 ++-- .../exec/work/fragment/FragmentExecutor.java | 179 ++++++++++++++----- 3 files changed, 160 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index 5e366fb..ddeb3e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -20,19 +20,28 @@ package org.apache.drill.exec.physical.impl; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; /** - * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange - * output nodes and storage nodes. They are there driving force behind the completion of a query. + * <h2>Functionality</h2> + * <p> + * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange + * output nodes and storage nodes. They are there driving force behind the completion of a query. + * </p> + * <h2>Assumptions</h2> + * <p> + * All implementations of {@link RootExec} assume that all their methods are called by the same thread. + * </p> */ public interface RootExec extends AutoCloseable { /** * Do the next batch of work. - * @return Whether or not additional batches of work are necessary. False means that this fragment is done. + * @return Whether or not additional batches of work are necessary. False means that this fragment is done. */ - public boolean next(); + boolean next(); /** - * Inform sender that receiving fragment is finished and doesn't need any more data - * @param handle + * Inform sender that receiving fragment is finished and doesn't need any more data. This can be called multiple + * times (once for each downstream receiver). If all receivers are finished then a subsequent call to {@link #next()} + * will return false. + * @param handle The handle pointing to the downstream receiver that does not need anymore data. */ - public void receivingFragmentFinished(FragmentHandle handle); + void receivingFragmentFinished(FragmentHandle handle); } http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 25be50a..7e76238 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -65,14 +65,13 @@ public class PartitionSenderRootExec extends BaseRootExec { private PartitionerDecorator partitioner; private ExchangeFragmentContext context; - private boolean ok = true; private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final double cost; private final AtomicIntegerArray remainingReceivers; private final AtomicInteger remaingReceiverCount; - private volatile boolean done = false; + private boolean done = false; private boolean first = true; private boolean closeIncoming; @@ -146,11 +145,8 @@ public class PartitionSenderRootExec extends BaseRootExec { @Override public boolean innerNext() { - if (!ok) { - return false; - } - IterOutcome out; + if (!done) { out = next(incoming); } else { @@ -252,13 +248,11 @@ public class PartitionSenderRootExec extends BaseRootExec { startIndex, endIndex); } - synchronized (this) { - partitioner = new PartitionerDecorator(subPartitioners, stats, context); - for (int index = 0; index < terminations.size(); index++) { - partitioner.getOutgoingBatches(terminations.buffer[index]).terminate(); - } - terminations.clear(); + partitioner = new PartitionerDecorator(subPartitioners, stats, context); + for (int index = 0; index < terminations.size(); index++) { + partitioner.getOutgoingBatches(terminations.buffer[index]).terminate(); } + terminations.clear(); success = true; } finally { @@ -328,12 +322,10 @@ public class PartitionSenderRootExec extends BaseRootExec { public void receivingFragmentFinished(FragmentHandle handle) { final int id = handle.getMinorFragmentId(); if (remainingReceivers.compareAndSet(id, 0, 1)) { - synchronized (this) { - if (partitioner == null) { - terminations.add(id); - } else { - partitioner.getOutgoingBatches(id).terminate(); - } + if (partitioner == null) { + terminations.add(id); + } else { + partitioner.getOutgoingBatches(id).terminate(); } int remaining = remaingReceiverCount.decrementAndGet(); @@ -347,7 +339,7 @@ public class PartitionSenderRootExec extends BaseRootExec { public void close() throws Exception { logger.debug("Partition sender stopping."); super.close(); - ok = false; + if (partitioner != null) { updateAggregateStats(); partitioner.clear(); @@ -358,7 +350,7 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public void sendEmptyBatch(boolean isLast) { + private void sendEmptyBatch(boolean isLast) { BatchSchema schema = incoming.getSchema(); if (schema == null) { // If the incoming batch has no schema (possible when there are no input records), http://git-wip-us.apache.org/repos/asf/drill/blob/a264e7fe/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 4f43dc1..efdb96a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -19,7 +19,9 @@ package org.apache.drill.exec.work.fragment; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -31,7 +33,6 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; @@ -49,14 +50,58 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener; import org.apache.hadoop.security.UserGroupInformation; /** - * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request - * and cancellation messages. + * <h2>Overview</h2> + * <p> + * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation messages. + * </p> + * <h2>Theory of Operation</h2> + * <p> + * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the {@link FragmentExecutor#run()} method in a single thread. While a fragment is running + * it may be subject to termination requests. The {@link FragmentExecutor} is reponsible for gracefully handling termination requests for the {@link RootExec}. There + * are two types of termination messages: + * <ol> + * <li><b>Cancellation Request:</b> This signals that the fragment and therefore the {@link RootExec} need to terminate immediately.</li> + * <li><b>Receiver Finished:</b> This signals that a downstream receiver no longer needs anymore data. A fragment may recieve multiple receiver finished requests + * (one for each downstream receiver). The {@link RootExec} will only terminate once it has recieved {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages + * for all downstream receivers.</li> + * </ol> + * </p> + * <p> + * The {@link FragmentExecutor} processes termination requests appropriately for the {@link RootExec}. A <b>Cancellation Request</b> is signalled when + * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event is signalled when {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is + * called. The way in which these signals are handled is the following: + * </p> + * <h3>Cancellation Request</h3> + * <p> + * There are two ways in which a cancellation request can be handled when {@link FragmentExecutor#cancel()} is called. + * <ol> + * <li>The Cancellation Request is recieved before the {@link RootExec} for the fragment is even started. In this case we can cleanup resources allocated for the fragment + * and never start a {@link RootExec}</li> + * <li>The Cancellation Request is recieve after the {@link RootExec} for the fragment is started. In this the cancellation request is sent to the + * {@link FragmentEventProcessor}. If this is not the first cancellation request it is ignored. If this is the first cancellation request the {@link RootExec} for this + * fragment is terminated by interrupting it. Then the {@link FragmentExecutor#run()} thread proceeds to cleanup resources normally</li> + * </ol> + * </p> + * <h3>Receiver Finished</h3> + * <p> + * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called, the message is passed to the {@link FragmentEventProcessor} if we + * did not already recieve a Cancellation request. Then the finished message is queued in {@link FragmentExecutor#receiverFinishedQueue}. The {@link FragmentExecutor#run()} polls + * {@link FragmentExecutor#receiverFinishedQueue} and singlas the {@link RootExec} with {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately. + * </p> + * <h2>Possible Design Flaws / Poorly Defined Behavoir</h2> + * <p> + * There are still a few aspects of the {@link FragmentExecutor} design that are not clear. + * <ol> + * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, will we eventually get one from every downstream receiver?</li> + * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we cancel the fragment?</li> + * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we run out of data from the upstream?</li> + * </ol> + * </p> */ public class FragmentExecutor implements Runnable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class); - private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false); private final String fragmentName; private final ExecutorFragmentContext fragmentContext; private final FragmentStatusReporter statusReporter; @@ -66,6 +111,11 @@ public class FragmentExecutor implements Runnable { private volatile RootExec root; private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION); + /** + * Holds all of the messages sent by downstream receivers that have finished. The {@link FragmentExecutor#run()} thread reads from this queue and passes the + * finished messages to the fragment's {@link RootExec} via the {@link RootExec#receivingFragmentFinished(FragmentHandle)} method. + */ + private final Queue<FragmentHandle> receiverFinishedQueue = new ConcurrentLinkedQueue<>(); private final FragmentEventProcessor eventProcessor = new FragmentEventProcessor(); // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished @@ -135,12 +185,16 @@ public class FragmentExecutor implements Runnable { } /** + * <p> * Cancel the execution of this fragment is in an appropriate state. Messages come from external. - * NOTE that this will be called from threads *other* than the one running this runnable(), + * </p> + * <p> + * <b>Note:</b> This will be called from threads <b>Other</b> than the one running this runnable(), * so we need to be careful about the state transitions that can result. + * </p> */ public void cancel() { - final boolean thisIsOnlyThread = hasCloseoutThread.compareAndSet(false, true); + final boolean thisIsOnlyThread = myThreadRef.compareAndSet(null, Thread.currentThread()); if (thisIsOnlyThread) { eventProcessor.cancelAndFinish(); @@ -182,13 +236,12 @@ public class FragmentExecutor implements Runnable { @SuppressWarnings("resource") @Override public void run() { - // if a cancel thread has already entered this executor, we have not reason to continue. - if (!hasCloseoutThread.compareAndSet(false, true)) { + final Thread myThread = Thread.currentThread(); + + if (!myThreadRef.compareAndSet(null, myThread)) { return; } - final Thread myThread = Thread.currentThread(); - myThreadRef.set(myThread); final String originalThreadName = myThread.getName(); final FragmentHandle fragmentHandle = fragmentContext.getHandle(); final ClusterCoordinator clusterCoordinator = fragmentContext.getClusterCoordinator(); @@ -203,10 +256,10 @@ public class FragmentExecutor implements Runnable { final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator : fragmentContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson()); - root = ImplCreator.getExec(fragmentContext, rootOperator); - if (root == null) { - return; - } + root = ImplCreator.getExec(fragmentContext, rootOperator); + if (root == null) { + return; + } clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener); updateState(FragmentState.RUNNING); @@ -227,11 +280,19 @@ public class FragmentExecutor implements Runnable { @Override public Void run() throws Exception { injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class); - /* - * Run the query until root.next returns false OR we no longer need to continue. - */ - while (shouldContinue() && root.next()) { - // loop + + while (shouldContinue()) { + // Fragment is not cancelled + + for (FragmentHandle fragmentHandle; (fragmentHandle = receiverFinishedQueue.poll()) != null;) { + // See if we have any finished requests. If so execute them. + root.receivingFragmentFinished(fragmentHandle); + } + + if (!root.next()) { + // Fragment has processed all of its data + break; + } } return null; @@ -245,19 +306,17 @@ public class FragmentExecutor implements Runnable { // we have a heap out of memory error. The JVM in unstable, exit. CatastrophicFailure.exit(e, "Unable to handle out of memory condition in FragmentExecutor.", -2); } + } catch (InterruptedException e) { + // Swallow interrupted exceptions since we intentionally interrupt the root when cancelling a query + logger.trace("Interruped root: {}", e); } catch (Throwable t) { fail(t); } finally { - // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an - // interruption after we have moved beyond this block. - synchronized (myThreadRef) { - myThreadRef.set(null); - Thread.interrupted(); - } - - // Make sure the event processor is started at least once - eventProcessor.start(); + // Don't process any more termination requests, we are done. + eventProcessor.terminate(); + // Clear the interrupt flag if it is set. + Thread.interrupted(); // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED // FAILED state will be because of any Exception in execution loop root.next() @@ -475,6 +534,7 @@ public class FragmentExecutor implements Runnable { * This is especially important as fragments can take longer to start */ private class FragmentEventProcessor extends EventProcessor<FragmentEvent> { + private AtomicBoolean terminate = new AtomicBoolean(false); void cancel() { sendEvent(new FragmentEvent(EventType.CANCEL, null)); @@ -488,47 +548,72 @@ public class FragmentExecutor implements Runnable { sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle)); } + /** + * Tell the {@link FragmentEventProcessor} not to process anymore events. This keeps stray cancellation requests + * from being processed after the root has finished running and interrupts in the root thread have been cleared. + */ + public void terminate() { + terminate.set(true); + } + @Override protected void processEvent(FragmentEvent event) { + if (event.type.equals(EventType.RECEIVER_FINISHED)) { + // Finish request + if (terminate.get()) { + // We have already recieved a cancellation or we have terminated the event processor. Do not process anymore finish requests. + return; + } + } else { + // Cancel request + if (!terminate.compareAndSet(false, true)) { + // We have already received a cancellation or we have terminated the event processor. Do not process anymore cancellation requests. + // This prevents the root thread from being interrupted at an inappropriate time. + return; + } + } + switch (event.type) { case CANCEL: - /* - * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called. - */ + // We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called. updateState(FragmentState.CANCELLATION_REQUESTED); - - /* - * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We - * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out - * procedure of the main thread. - */ - synchronized (myThreadRef) { - final Thread myThread = myThreadRef.get(); - if (myThread != null) { - logger.debug("Interrupting fragment thread {}", myThread.getName()); - myThread.interrupt(); - } - } + // The root was started so we have to interrupt it in case it is performing a blocking operation. + killThread(); break; - case CANCEL_AND_FINISH: + // In this case the root was never started so we do not have to interrupt the thread. updateState(FragmentState.CANCELLATION_REQUESTED); + // The FragmentExecutor#run() loop will not execute in this case so we have to cleanup resources here cleanup(FragmentState.FINISHED); break; - case RECEIVER_FINISHED: assert event.handle != null : "RECEIVER_FINISHED event must have a handle"; if (root != null) { logger.info("Applying request for early sender termination for {} -> {}.", QueryIdHelper.getQueryIdentifier(getContext().getHandle()), QueryIdHelper.getFragmentId(event.handle)); - root.receivingFragmentFinished(event.handle); + + receiverFinishedQueue.add(event.handle); } else { logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.", QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(event.handle)); } + // Note we do not terminate the event processor in this case since we can recieve multiple RECEIVER_FINISHED + // events. One for each downstream receiver. break; } } + + /* + * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We + * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out + * procedure of the main thread. + */ + private void killThread() { + // myThreadRef must contain a non-null reference at this point + final Thread myThread = myThreadRef.get(); + logger.debug("Interrupting fragment thread {}", myThread.getName()); + myThread.interrupt(); + } } }