IMPALA-1144: Fix exception when cancelling query in Impala-shell with CTRL-C
Issue 1: When query is cancelled via CTRL-C while being executed in Impala-shell then an exception is thrown from Impala backend saying 'Invalid query handle'. This is because one ImpalaClient was making RPC's while another ImpalaClient cancelled the query on the backend. As a result RPC handlers in ImpalaServer try to access a ClientRequestState that had been cleared from the backend. The issue is confidently reproducable both in wait_to_finish and in fetch states of the query. As a solution the query cancellation is indicated to ImpalaClient via a bool flag. Once a cancellation originated exception reaches Impala shell this flag is checked to decide whether to suppress the error or not. Issue 2: Every time a query was cancelled a 'use db' command was issued automatically. This happened to historical reasons but is not needed anymore (see Jira for more details). Change-Id: I6cefaf1dae78baae238289816a7cb9d210fb38e2 Reviewed-on: http://gerrit.cloudera.org:8080/8549 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6d9da172 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6d9da172 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6d9da172 Branch: refs/heads/master Commit: 6d9da172889cde75e041caf4fa024f4d9f223db5 Parents: dc1282f Author: Gabor Kaszab <gaborkas...@cloudera.com> Authored: Wed Nov 15 01:01:45 2017 +0100 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Wed Nov 29 03:44:51 2017 +0000 ---------------------------------------------------------------------- shell/impala_client.py | 25 +++++++++++++++++++++---- shell/impala_shell.py | 7 +++++-- tests/shell/test_shell_commandline.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/shell/impala_client.py ---------------------------------------------------------------------- diff --git a/shell/impala_client.py b/shell/impala_client.py index 868d898..795768c 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -55,6 +55,8 @@ class DisconnectedException(Exception): def __str__(self): return self.value +class QueryCancelledByShellException(Exception): pass + class ImpalaClient(object): def __init__(self, impalad, use_kerberos=False, kerberos_service_name="impala", @@ -74,6 +76,10 @@ class ImpalaClient(object): self.query_option_levels = {} self.query_state = QueryState._NAMES_TO_VALUES self.fetch_batch_size = 1024 + # This is set from ImpalaShell's signal handler when a query is cancelled + # from command line via CTRL+C. It is used to suppress error messages of + # query cancellation. + self.is_query_cancelled = False def _options_to_string_list(self, set_query_options): return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()] @@ -306,6 +312,7 @@ class ImpalaClient(object): return query def execute_query(self, query): + self.is_query_cancelled = False rpc_result = self._do_rpc(lambda: self.imp_service.query(query)) last_query_handle, status = rpc_result if status != RpcStatus.OK: @@ -381,7 +388,8 @@ class ImpalaClient(object): # co-ordinator, so we don't need to wait. if query_handle_closed: return True - rpc_result = self._do_rpc(lambda: self.imp_service.Cancel(last_query_handle)) + rpc_result = self._do_rpc(lambda: self.imp_service.Cancel(last_query_handle), + False) _, status = rpc_result return status == RpcStatus.OK @@ -409,7 +417,7 @@ class ImpalaClient(object): return summary return None - def _do_rpc(self, rpc): + def _do_rpc(self, rpc, suppress_error_on_cancel=True): """Executes the provided callable.""" if not self.connected: @@ -428,16 +436,25 @@ class ImpalaClient(object): status = RpcStatus.ERROR return ret, status except BeeswaxService.QueryNotFoundException: + if suppress_error_on_cancel and self.is_query_cancelled: + raise QueryCancelledByShellException() raise QueryStateException('Error: Stale query handle') # beeswaxException prints out the entire object, printing # just the message is far more readable/helpful. except BeeswaxService.BeeswaxException, b: - raise RPCException("ERROR: %s" % b.message) + # Suppress the errors from cancelling a query that is in fetch state + if suppress_error_on_cancel and self.is_query_cancelled: + raise QueryCancelledByShellException() + raise RPCException("ERROR: %s" % b.message) except TTransportException, e: # issue with the connection with the impalad raise DisconnectedException("Error communicating with impalad: %s" % e) except TApplicationException, t: - raise RPCException("Application Exception : %s" % t) + # Suppress the errors from cancelling a query that is in waiting_to_finish + # state + if suppress_error_on_cancel and self.is_query_cancelled: + raise QueryCancelledByShellException() + raise RPCException("Application Exception : %s" % t) return None, RpcStatus.ERROR def _get_sleep_interval(self, start_time): http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/shell/impala_shell.py ---------------------------------------------------------------------- diff --git a/shell/impala_shell.py b/shell/impala_shell.py index a9a527a..ffe01e1 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -36,7 +36,8 @@ import textwrap import time from impala_client import (ImpalaClient, DisconnectedException, QueryStateException, - RPCException, TApplicationException) + RPCException, TApplicationException, + QueryCancelledByShellException) from impala_shell_config_defaults import impala_shell_defaults from option_parser import get_option_parser, get_config_from_file from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter @@ -484,13 +485,13 @@ class ImpalaShell(object, cmd.Cmd): # Create a new connection to the impalad and cancel the query. for cancel_try in xrange(ImpalaShell.CANCELLATION_TRIES): try: + self.imp_client.is_query_cancelled = True self.query_handle_closed = True print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE) new_imp_client = self._new_impala_client() new_imp_client.connect() new_imp_client.cancel_query(self.last_query_handle, False) self.imp_client.close_query(self.last_query_handle) - self._validate_database() break except Exception, e: # Suppress harmless errors. @@ -1038,6 +1039,8 @@ class ImpalaShell(object, cmd.Cmd): except RPCException, e: if self.show_profiles: raise e return CmdStatus.SUCCESS + except QueryCancelledByShellException, e: + return CmdStatus.SUCCESS except RPCException, e: # could not complete the rpc successfully print_to_stderr(e) http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/tests/shell/test_shell_commandline.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 218224b..1ecdbd5 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -325,6 +325,36 @@ class TestImpalaShell(ImpalaTestSuite): assert "Cancelling Query" in result.stderr, result.stderr + def test_query_cancellation_during_fetch(self): + """IMPALA-1144: Test cancellation (CTRL+C) while results are being + fetched""" + # A select query where fetch takes several seconds + args = '-q "with v as (values (1 as x), (2), (3), (4)) ' + \ + 'select * from v, v v2, v v3, v v4, v v5, v v6, v v7, v v8, ' + \ + 'v v9, v v10, v v11;"' + # Kill happens when the results are being fetched + self.run_and_verify_query_cancellation_test(args) + + def test_query_cancellation_during_wait_to_finish(self): + """IMPALA-1144: Test cancellation (CTRL+C) while the query is in the + wait_to_finish state""" + # A select where wait_to_finish takes several seconds + args = '-q "select * from tpch.customer c1, tpch.customer c2, ' + \ + 'tpch.customer c3 order by c1.c_name"' + # Kill happens in wait_to_finish state + self.run_and_verify_query_cancellation_test(args) + + def run_and_verify_query_cancellation_test(self, args): + """Starts the execution of the received query, waits until the query + execution in fact starts and then cancels it. Expects the query + cancellation to succeed.""" + p = ImpalaShell(args) + sleep(2.0) + os.kill(p.pid(), signal.SIGINT) + result = p.get_result() + assert "Cancelling Query" in result.stderr + assert "Invalid query handle" not in result.stderr + def test_get_log_once(self, empty_table): """Test that get_log() is always called exactly once.""" # Query with fetch