http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index bb531ce..6fb0ae5 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -27,6 +27,7 @@ import threading from copy import copy from time import sleep, time +from beeswaxd.BeeswaxService import QueryState from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import specific_build_type_timeout, IMPALAD_BUILD @@ -42,6 +43,7 @@ from tests.common.test_vector import ImpalaTestDimension from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session from ImpalaService import ImpalaHiveServer2Service from TCLIService import TCLIService +from tests.verifiers.metric_verifier import MetricVerifier LOG = logging.getLogger('admission_test') @@ -481,6 +483,82 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): if impalad_with_2g_mem is not None: impalad_with_2g_mem.close() + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args= "--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1, + max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT), + statestored_args=_STATESTORED_ARGS) + def test_cancellation(self): + """ Test to confirm that all Async cancellation windows are hit and are able to + succesfully cancel the query""" + impalad = self.cluster.impalads[0] + client = impalad.service.create_beeswax_client() + try: + client.set_configuration_option("debug_action", "SLEEP_BEFORE_ADMISSION_MS:2000") + client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1 ) + handle = client.execute_async("select 1") + sleep(1) + client.close_query(handle) + self.assert_impalad_log_contains('INFO', + "Ready to be Rejected but already cancelled, query id=") + client.clear_configuration() + + client.set_configuration_option("debug_action", "SLEEP_BEFORE_ADMISSION_MS:2000") + handle = client.execute_async("select 1") + sleep(1) + client.close_query(handle) + self.assert_impalad_log_contains('INFO', + "Ready to be Admitted immediately but already cancelled, query id=") + + client.set_configuration_option("debug_action", + "SLEEP_AFTER_COORDINATOR_STARTS_MS:2000") + handle = client.execute_async("select 1") + sleep(1) + client.close_query(handle) + self.assert_impalad_log_contains('INFO', + "Cancelled right after starting the coordinator query id=") + + client.clear_configuration() + handle = client.execute_async("select sleep(10000)") + client.set_configuration_option("debug_action", + "SLEEP_AFTER_ADMISSION_OUTCOME_MS:2000") + queued_query_handle = client.execute_async("select 1") + sleep(1) + assert client.get_state(queued_query_handle) == QueryState.COMPILED + assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle) + # Only cancel the queued query, because close will wait till it unregisters, this + # gives us a chance to close the running query and allow the dequeue thread to + # dequeue the queue query + client.cancel(queued_query_handle) + client.close_query(handle) + client.close_query(queued_query_handle) + queued_profile = client.get_runtime_profile(queued_query_handle) + assert "Admission result: Cancelled (queued)" in queued_profile + self.assert_impalad_log_contains('INFO', "Dequeued cancelled query=") + client.clear_configuration() + + handle = client.execute_async("select sleep(10000)") + queued_query_handle = client.execute_async("select 1") + sleep(1) + assert client.get_state(queued_query_handle) == QueryState.COMPILED + assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle) + client.close_query(queued_query_handle) + client.close_query(handle) + queued_profile = client.get_runtime_profile(queued_query_handle) + assert "Admission result: Cancelled (queued)" in queued_profile + except Exception as e: + print e.args + finally: + client.close() + for i in self.cluster.impalads: + i.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0) + assert self.cluster.impalads[0].service.get_metric_value( + "admission-controller.agg-num-running.default-pool") == 0 + assert self.cluster.impalads[0].service.get_metric_value( + "admission-controller.total-admitted.default-pool") == 3 + assert self.cluster.impalads[0].service.get_metric_value( + "admission-controller.total-queued.default-pool") == 2 + class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions (parameterized) and the ability to submit to one impalad or many in a round-robin @@ -759,16 +837,20 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): LOG.info("Submitting query %s", self.query_num) self.query_handle = client.execute_async(query) - except ImpalaBeeswaxException as e: - if re.search("Rejected.*queue full", str(e)): + client.wait_for_admission_control(self.query_handle) + admission_result = client.get_admission_result(self.query_handle) + assert len(admission_result) > 0 + if "Rejected" in admission_result: LOG.info("Rejected query %s", self.query_num) self.query_state = 'REJECTED' + self.query_handle = None return - elif "exceeded timeout" in str(e): + elif "timeout" in admission_result: LOG.info("Query %s timed out", self.query_num) self.query_state = 'TIMED OUT' + self.query_handle = None return - else: + except ImpalaBeeswaxException as e: raise e finally: self.lock.release()
http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/custom_cluster/test_krpc_mem_usage.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py index c26b776..d358baa 100644 --- a/tests/custom_cluster/test_krpc_mem_usage.py +++ b/tests/custom_cluster/test_krpc_mem_usage.py @@ -93,6 +93,7 @@ class TestKrpcMemUsage(CustomClusterTestSuite): self.client.execute(query) # Execute and cancel query handle = self.client.execute_async(query) + self.client.wait_for_admission_control(handle) # Sleep to allow RPCs to arrive. time.sleep(0.5) self.client.cancel(handle) http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/custom_cluster/test_session_expiration.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py index e89fffd..78862ec 100644 --- a/tests/custom_cluster/test_session_expiration.py +++ b/tests/custom_cluster/test_session_expiration.py @@ -89,3 +89,23 @@ class TestSessionExpiration(CustomClusterTestSuite): # now client should have expired assert num_expired + 1 == impalad.service.get_metric_value( "impala-server.num-sessions-expired") + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("-default_pool_max_requests 1") + def test_session_expiration_with_queued_query(self, vector): + """Ensure that a query waiting in queue gets cancelled if the session expires.""" + impalad = self.cluster.get_any_impalad() + client = impalad.service.create_beeswax_client() + client.execute("SET IDLE_SESSION_TIMEOUT=3") + client.execute_async("select sleep(10000)") + queued_handle = client.execute_async("select 1") + impalad.service.wait_for_metric_value( + "admission-controller.local-num-queued.default-pool", 1) + sleep(3) + impalad.service.wait_for_metric_value( + "admission-controller.local-num-queued.default-pool", 0) + impalad.service.wait_for_metric_value( + "admission-controller.agg-num-running.default-pool", 0) + queued_query_profile = impalad.service.create_beeswax_client().get_runtime_profile( + queued_handle) + assert "Admission result: Cancelled (queued)" in queued_query_profile http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/hs2/hs2_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py index 137bd46..81c5033 100644 --- a/tests/hs2/hs2_test_suite.py +++ b/tests/hs2/hs2_test_suite.py @@ -228,6 +228,22 @@ class HS2TestSuite(ImpalaTestSuite): assert False, 'Did not reach expected operation state %s in time, actual state was ' \ '%s' % (expected_state, get_operation_status_resp.operationState) + def wait_for_admission_control(self, operation_handle, timeout = 10): + """Waits for the admission control processing of the query to complete by polling + GetOperationStatus every interval seconds, returning the TGetOperationStatusResp, + or raising an assertion after timeout seconds.""" + start_time = time() + while (time() - start_time < timeout): + get_operation_status_resp = self.get_operation_status(operation_handle) + HS2TestSuite.check_response(get_operation_status_resp) + if TCLIService.TOperationState.INITIALIZED_STATE < \ + get_operation_status_resp.operationState < \ + TCLIService.TOperationState.PENDING_STATE: + return get_operation_status_resp + sleep(0.05) + assert False, 'Did not complete admission control processing in time, current ' \ + 'operation state of query: %s' % (get_operation_status_resp.operationState) + def execute_statement(self, statement, conf_overlay=None, expected_status_code=TCLIService.TStatusCode.SUCCESS_STATUS, expected_error_prefix=None): http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/hs2/test_hs2.py ---------------------------------------------------------------------- diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py index ccdcdea..cd861e9 100644 --- a/tests/hs2/test_hs2.py +++ b/tests/hs2/test_hs2.py @@ -253,9 +253,10 @@ class TestHS2(HS2TestSuite): self.get_operation_status(execute_statement_resp.operationHandle) TestHS2.check_response(get_operation_status_resp) # If ExecuteStatement() has completed but the results haven't been fetched yet, the - # query must have at least reached RUNNING. + # query must have reached either PENDING or RUNNING or FINISHED. assert get_operation_status_resp.operationState in \ - [TCLIService.TOperationState.RUNNING_STATE, + [TCLIService.TOperationState.PENDING_STATE, + TCLIService.TOperationState.RUNNING_STATE, TCLIService.TOperationState.FINISHED_STATE] fetch_results_req = TCLIService.TFetchResultsReq() @@ -463,6 +464,13 @@ class TestHS2(HS2TestSuite): # should work. TestHS2.check_response(exec_summary_resp) + # Wait for query to start running so we can get a non-empty ExecSummary. + self.wait_for_admission_control(execute_statement_resp.operationHandle) + exec_summary_resp = self.hs2_client.GetExecSummary(exec_summary_req) + TestHS2.check_response(exec_summary_resp) + assert len(exec_summary_resp.summary.nodes) > 0 + + # Now close the query and verify the exec summary is available. close_operation_req = TCLIService.TCloseOperationReq() close_operation_req.operationHandle = execute_statement_resp.operationHandle TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req)) @@ -483,8 +491,9 @@ class TestHS2(HS2TestSuite): TestHS2.check_response(get_profile_resp) assert statement in get_profile_resp.profile # If ExecuteStatement() has completed but the results haven't been fetched yet, the - # query must have at least reached RUNNING. - assert "Query State: RUNNING" in get_profile_resp.profile or \ + # query must have reached either COMPILED or RUNNING or FINISHED. + assert "Query State: COMPILED" in get_profile_resp.profile or \ + "Query State: RUNNING" in get_profile_resp.profile or \ "Query State: FINISHED" in get_profile_resp.profile, get_profile_resp.profile fetch_results_req = TCLIService.TFetchResultsReq() http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/query_test/test_observability.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index bba154a..4e4bc25 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -89,10 +89,16 @@ class TestObservability(ImpalaTestSuite): def test_query_states(self): """Tests that the query profile shows expected query states.""" query = "select count(*) from functional.alltypes" - handle = self.execute_query_async(query, dict()) + handle = self.execute_query_async(query, + {"debug_action": "SLEEP_BEFORE_ADMISSION_MS:1000"}) + # If ExecuteStatement() has completed and the query is paused in the admission control + # phase, then the query must be in COMPILED state. + profile = self.client.get_runtime_profile(handle) + assert "Query State: COMPILED" in profile + # After completion of the admission control phase, the query must have at least + # reached RUNNING state. + self.client.wait_for_admission_control(handle) profile = self.client.get_runtime_profile(handle) - # If ExecuteStatement() has completed but the results haven't been fetched yet, the - # query must have at least reached RUNNING. assert "Query State: RUNNING" in profile or \ "Query State: FINISHED" in profile, profile @@ -104,17 +110,37 @@ class TestObservability(ImpalaTestSuite): def test_query_options(self): """Test that the query profile shows expected non-default query options, both set explicitly through client and those set by planner""" - # Set a query option explicitly through client - self.execute_query("set MEM_LIMIT = 8589934592") - # Make sure explicitly set default values are not shown in the profile - self.execute_query("set runtime_filter_wait_time_ms = 0") - runtime_profile = self.execute_query("select 1").runtime_profile - assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in runtime_profile + # Set mem_limit and runtime_filter_wait_time_ms to non-default and default value. + query_opts = {'mem_limit': 8589934592, 'runtime_filter_wait_time_ms': 0} + profile = self.execute_query("select 1", query_opts).runtime_profile + assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in profile,\ + profile # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1, # RUNTIME_FILTER_MODE=0 and MT_DOP=0 assert "Query Options (set by configuration and planner): MEM_LIMIT=8589934592," \ "NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \ - in runtime_profile + in profile + + def test_exec_summary(self): + """Test that the exec summary is populated correctly in every query state""" + query = "select count(*) from functional.alltypes" + handle = self.execute_query_async(query, + {"debug_action": "SLEEP_BEFORE_ADMISSION_MS:1000"}) + # If ExecuteStatement() has completed and the query is paused in the admission control + # phase, then the coordinator has not started yet and exec_summary should be empty. + exec_summary = self.client.get_exec_summary(handle) + assert exec_summary is not None and exec_summary.nodes is None + # After completion of the admission control phase, the coordinator would have started + # and we should get a populated exec_summary. + self.client.wait_for_admission_control(handle) + exec_summary = self.client.get_exec_summary(handle) + assert exec_summary is not None and exec_summary.nodes is not None + + self.client.fetch(query, handle) + exec_summary = self.client.get_exec_summary(handle) + # After fetching the results and reaching finished state, we should still be able to + # fetch an exec_summary. + assert exec_summary is not None and exec_summary.nodes is not None @SkipIfLocal.multiple_impalad @pytest.mark.xfail(reason="IMPALA-6338") http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/tests/query_test/test_udfs.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 902914a..9a365b5 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -40,14 +40,10 @@ class TestUdfBase(ImpalaTestSuite): """ Base class with utility functions for testing UDFs. """ - def _check_exception(self, e): - # The interesting exception message may be in 'e' or in its inner_exception - # depending on the point of query failure. - if 'Memory limit exceeded' in str(e) or 'Cancelled' in str(e): - return - if e.inner_exception is not None\ - and ('Memory limit exceeded' in e.inner_exception.message - or 'Cancelled' not in e.inner_exception.message): + def _check_mem_limit_exception(self, e): + """Return without error if the exception is MEM_LIMIT_EXCEEDED, re-raise 'e' + in all other cases.""" + if 'Memory limit exceeded' in str(e): return raise e @@ -427,21 +423,22 @@ class TestUdfExecution(TestUdfBase): # queries to fail @pytest.mark.execute_serially def test_mem_limits(self, vector, unique_database): - # Set the mem limit high enough that a simple scan can run - mem_limit = 1024 * 1024 + # Set the mem_limit and buffer_pool_limit high enough that the query makes it through + # admission control and a simple scan can run. vector = copy(vector) - vector.get_value('exec_option')['mem_limit'] = mem_limit + vector.get_value('exec_option')['mem_limit'] = '1mb' + vector.get_value('exec_option')['buffer_pool_limit'] = '32kb' try: self.run_test_case('QueryTest/udf-mem-limit', vector, use_db=unique_database) assert False, "Query was expected to fail" except ImpalaBeeswaxException, e: - self._check_exception(e) + self._check_mem_limit_exception(e) try: self.run_test_case('QueryTest/uda-mem-limit', vector, use_db=unique_database) assert False, "Query was expected to fail" except ImpalaBeeswaxException, e: - self._check_exception(e) + self._check_mem_limit_exception(e) # It takes a long time for Impala to free up memory after this test, especially if # ASAN is enabled. Verify that all fragments finish executing before moving on to the http://git-wip-us.apache.org/repos/asf/impala/blob/2de9db8f/www/query_backends.tmpl ---------------------------------------------------------------------- diff --git a/www/query_backends.tmpl b/www/query_backends.tmpl index 77ed261..c1833c6 100644 --- a/www/query_backends.tmpl +++ b/www/query_backends.tmpl @@ -87,7 +87,8 @@ function toggleRefresh() { {{^backend_states}} <div class="alert alert-info" role="alert"> -Query <strong>{{query_id}}</strong> has completed, or has no backends. +Query <strong>{{query_id}}</strong> has either completed or has no backends or has not +started any backends yet. </div> {{/backend_states}}