This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new aa6d788 IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink aa6d788 is described below commit aa6d7887eec1efd33c73f77e5346a499569e5b6b Author: Joe McDonnell <joemcdonn...@cloudera.com> AuthorDate: Tue Jun 16 11:57:05 2020 -0700 IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink In BufferedPlanRootSink::FlushFinal(), if Cancel() runs before FlushFinal() waits on the consumer_eos_ condition variable, the thread in FlushFinal() will wait forever. This is because it is not checking for cancellation or synchronizing with the Cancel() thread. Specifically: Thread A: Calls BufferedPlanRootSink::Cancel(), signalling any thread currently waiting on the consumer_eos_ condition variable. Thread B: Enters FlushFinal(). Never tests RuntimeState::is_cancelled() and calls Wait() on the consumer_eos_ condition variable. This waits forever. This changes BufferedPlanRootSink::Cancel() to get the lock_ before signalling the consumer_eos_ condition variable. It also changes FlushFinal() to call Wait() in a loop. It breaks out of the loop if it is cancelled or the batch_queue_ is empty. There are two cases: 1. FlushFinal() gets the lock_ first and only releases it when waiting on the consumer_eos_ condition variable. It will get signalled by Cancel(). 2. Cancel() gets the lock_ first and FlushFinal() will not wait, because is_cancelled() is true. Testing: - Run core tests Change-Id: Id6f3fbc05420ca95313fa79ea106547feb92b16b Reviewed-on: http://gerrit.cloudera.org:8080/16088 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/buffered-plan-root-sink.cc | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc index 277eac6..6dd69dd 100644 --- a/be/src/exec/buffered-plan-root-sink.cc +++ b/be/src/exec/buffered-plan-root-sink.cc @@ -105,8 +105,9 @@ Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) { // If no batches are ever added, wake up the consumer thread so it can check the // SenderState and return appropriately. rows_available_.NotifyAll(); - // Wait until the consumer has read all rows from the batch_queue_. - { + // Wait until the consumer has read all rows from the batch_queue_ or this has + // been cancelled. + while (!IsCancelledOrClosed(state) && !IsQueueEmpty(state)) { SCOPED_TIMER(profile()->inactive_timer()); consumer_eos_.Wait(l); } @@ -136,6 +137,14 @@ void BufferedPlanRootSink::Close(RuntimeState* state) { void BufferedPlanRootSink::Cancel(RuntimeState* state) { DCHECK(state->is_cancelled()); + // Get the lock_ to synchronize with FlushFinal(). Either FlushFinal() will be waiting + // on the consumer_eos_ condition variable and get signalled below, or it will see + // that is_cancelled() is true after it gets the lock. Drop the the lock before + // signalling the CV so that a blocked thread can immediately acquire the mutex when + // it wakes up. + { + unique_lock<mutex> l(lock_); + } // Wake up all sleeping threads so they can check the cancellation state. // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to // ensure that all sleeping threads are awoken. The calls to NotifyAll() are not on the