This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new 21f5a7a6e IMPALA-10180: Add summary stats for client fetch wait time 21f5a7a6e is described below commit 21f5a7a6e5c7572c6eebe6a5630a93f6f73a4404 Author: Joe McDonnell <joemcdonn...@cloudera.com> AuthorDate: Sat May 13 10:48:30 2023 -0700 IMPALA-10180: Add summary stats for client fetch wait time This adds ClientFetchWaitTimeStats to the runtime profile to track the min/max/# of samples for ClientFetchWaitTimer. Here is some sample output: - ClientFetchWaitTimeStats: (Avg: 161.554ms ; Min: 101.411ms ; Max: 461.728ms ; Number of samples: 6) - ClientFetchWaitTimer: 969.326ms This also fixes the definition of ClientFetchWaitTimer to avoid including time after end of fetch. When the client is closing the query, Finalize() gets called. The Finalize() call should only add extra client wait time if fetch has not completed. Testing: - Added test cases in query_test/test_fetch.py with specific numbers of fetches and verification of the statistics. - The test cases make use of a new function for parsing summary stats for timers, and this also gets its own test case. Change-Id: I9ca525285e03c7b51b04ac292f7b3531e6178218 Reviewed-on: http://gerrit.cloudera.org:8080/19897 Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-by: Csaba Ringhofer <csringho...@cloudera.com> --- be/src/service/client-request-state.cc | 21 ++++++++++- be/src/service/client-request-state.h | 4 ++ tests/infra/test_utils.py | 21 ++++++++++- tests/query_test/test_fetch.py | 69 +++++++++++++++++++++++++++++++++- tests/util/parse_util.py | 68 +++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+), 3 deletions(-) diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 77d0788ee..a27248db5 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -131,6 +131,8 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro num_rows_fetched_from_cache_counter_ = ADD_COUNTER(server_profile_, "NumRowsFetchedFromCache", TUnit::UNIT); client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer"); + client_wait_time_stats_ = + ADD_SUMMARY_STATS_TIMER(server_profile_, "ClientFetchWaitTimeStats"); bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND; // "Impala Backend Timeline" was specifically chosen to exploit the lexicographical // ordering defined by the underlying std::map holding the timelines displayed in @@ -1691,7 +1693,24 @@ void ClientRequestState::MarkInactive() { void ClientRequestState::MarkActive() { client_wait_sw_.Stop(); int64_t elapsed_time = client_wait_sw_.ElapsedTime(); - client_wait_timer_->Set(elapsed_time); + // If we have reached eos, then the query is already complete, + // and we should not accumulate more client wait time. This mostly + // impacts the finalization step, where the client is closing the + // query and does not need any more rows. Fetching may have already + // completed prior to this point, so finalization time should not + // count in that case. If the fetch was incomplete, then the client + // time should be counted for finalization as well. + if (!eos()) { + client_wait_timer_->Set(elapsed_time); + // The first call is before any MarkInactive() call has run and produces + // a zero-length sample. Skip this zero-length sample (but not any later + // zero-length samples). + if (elapsed_time != 0 || last_client_wait_time_ != 0) { + int64_t current_wait_time = elapsed_time - last_client_wait_time_; + client_wait_time_stats_->UpdateCounter(current_wait_time); + } + last_client_wait_time_ = elapsed_time; + } lock_guard<mutex> l(expiration_data_lock_); last_active_time_ms_ = UnixMillis(); ++ref_count_; diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 93a38c375..a6f580082 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -589,9 +589,13 @@ class ClientRequestState { RuntimeProfile::Counter* row_materialization_rate_; /// Tracks how long we are idle waiting for a client to fetch rows. + /// Keep summary statistics to get a sense for the number of fetch calls and + /// the typical round-trip time. RuntimeProfile::Counter* client_wait_timer_; + RuntimeProfile::SummaryStatsCounter* client_wait_time_stats_; /// Timer to track idle time for the above counter. MonotonicStopWatch client_wait_sw_; + int64_t last_client_wait_time_ = 0; RuntimeProfile::EventSequence* query_events_; diff --git a/tests/infra/test_utils.py b/tests/infra/test_utils.py index 897cd90c7..8c93770cf 100644 --- a/tests/infra/test_utils.py +++ b/tests/infra/test_utils.py @@ -19,7 +19,8 @@ from __future__ import absolute_import, division, print_function from tests.util.filesystem_utils import prepend_with_fs -from tests.util.parse_util import get_bytes_summary_stats_counter +from tests.util.parse_util import get_bytes_summary_stats_counter, \ + get_time_summary_stats_counter def test_filesystem_utils(): @@ -47,3 +48,21 @@ def test_get_bytes_summary_stats_counter(): assert len(summary_stats) == 1 assert summary_stats[0].sum == 32768 and summary_stats[0].min_value == 6144 and \ summary_stats[0].max_value == 10240 and summary_stats[0].total_num_values == 4 + + +def test_get_time_summary_stats_counter(): + """Test get_time_summary_stats_counter(counter_name, runtime_profile) using a dummy + runtime profile. + """ + # This is constructed to test the parsing logic for timestamps, so the number don't + # add up. + runtime_profile = "- ExampleTimeStats: (Avg: 161.554ms ; " \ + "Min: 101.411us ; " \ + "Max: 1h2m3s4ms5us6ns ; " \ + "Number of samples: 6)" + summary_stats = get_time_summary_stats_counter("ExampleTimeStats", runtime_profile) + assert len(summary_stats) == 1 + assert summary_stats[0].sum == 969324000 + assert summary_stats[0].min_value == 101411 + assert summary_stats[0].max_value == 3723004005006 + assert summary_stats[0].total_num_values == 6 diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py index 2b86a77fc..16e92a042 100644 --- a/tests/query_test/test_fetch.py +++ b/tests/query_test/test_fetch.py @@ -21,7 +21,8 @@ import re from time import sleep from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import extend_exec_option_dimension -from tests.util.parse_util import parse_duration_string_ms +from tests.util.parse_util import parse_duration_string_ms, \ + get_time_summary_stats_counter class TestFetch(ImpalaTestSuite): @@ -68,6 +69,72 @@ class TestFetch(ImpalaTestSuite): finally: self.client.close_query(handle) + def test_client_fetch_time_stats(self, vector): + num_rows = 27 + query = "select sleep(10) from functional.alltypes limit {0}".format(num_rows) + handle = self.execute_query_async(query, vector.get_value('exec_option')) + try: + # Wait until the query is 'FINISHED' and results are available for fetching. + self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30) + + # This loop will do 6 fetches that contain data and a final fetch with + # no data. The last fetch is after eos has been set, so it does not count. + rows_fetched = 0 + while True: + result = self.client.fetch(query, handle, max_rows=5) + assert result.success + rows_fetched += len(result.data) + # If no rows are returned, we are done. + if len(result.data) == 0: + break + sleep(0.1) + + # After fetching all rows, sleep before closing the query. This should not + # count as client wait time, because the query is already done. + sleep(2.5) + finally: + self.client.close_query(handle) + + runtime_profile = self.client.get_runtime_profile(handle) + summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats", + runtime_profile) + assert len(summary_stats) == 1 + assert summary_stats[0].total_num_values == 6 + # The 2.5 second sleep should not count, so the max must be less than 2.5 seconds. + assert summary_stats[0].max_value < 2500000000 + assert summary_stats[0].min_value > 0 + + def test_client_fetch_time_stats_incomplete(self, vector): + num_rows = 27 + query = "select sleep(10) from functional.alltypes limit {0}".format(num_rows) + handle = self.execute_query_async(query, vector.get_value('exec_option')) + try: + # Wait until the query is 'FINISHED' and results are available for fetching. + self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30) + + # This loop will do 5 fetches for a total of 25 rows. This is incomplete. + for i in range(5): + result = self.client.fetch(query, handle, max_rows=5) + assert result.success + sleep(0.1) + + # Sleep before closing the query. For an incomplete fetch, this still counts + # towards the query time, so this does show up in the counters. + sleep(2.5) + finally: + self.client.close_query(handle) + + runtime_profile = self.client.get_runtime_profile(handle) + + summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats", + runtime_profile) + assert len(summary_stats) == 1 + # There are 5 fetches and the finalization sample for a total of 6. + assert summary_stats[0].total_num_values == 6 + # The 2.5 second sleep does count for an incomplete fetch, verify the max is higher. + assert summary_stats[0].max_value >= 2500000000 + assert summary_stats[0].min_value > 0 + class TestFetchAndSpooling(ImpalaTestSuite): """Tests that apply when result spooling is enabled or disabled.""" diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py index 16c6fb220..bceb6ddf8 100644 --- a/tests/util/parse_util.py +++ b/tests/util/parse_util.py @@ -102,6 +102,23 @@ def parse_duration_string_ms(duration): return (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000 + times['ms'] +def parse_duration_string_ns(duration): + """Parses a duration string of the form 1h2h3m4s5.6ms4.5us7.8ns into nanoseconds.""" + pattern = r'(?P<value>[0-9]+\.?[0-9]*?)(?P<units>\D+)' + matches = list(re.finditer(pattern, duration)) + assert matches, 'Failed to parse duration string %s' % duration + + times = {'h': 0, 'm': 0, 's': 0, 'ms': 0, 'us': 0, 'ns': 0} + for match in matches: + parsed = match.groupdict() + times[parsed['units']] = float(parsed['value']) + + value_ns = (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000000000 + value_ns += times['ms'] * 1000000 + times['us'] * 1000 + times['ns'] + + return value_ns + + def get_duration_us_from_str(duration_str): """Parses the duration string got in profile and returns the duration in us""" match_res = re.search(r"\((\d+) us\)", duration_str) @@ -189,3 +206,54 @@ def get_bytes_summary_stats_counter(counter_name, runtime_profile): min_value=int(summary_stat['min']), max_value=int(summary_stat['max']))) return summary_stats + + +def get_time_summary_stats_counter(counter_name, runtime_profile): + """Extracts a list of TSummaryStatsCounters from a given runtime profile where the + units are time. Each entry in the returned list corresponds to a single occurrence + of the counter in the profile. If the counter is present, but it has not been + updated, an empty TSummaryStatsCounter is returned for that entry. If the counter + is not in the given profile, an empty list is returned. All time values are returned + as nanoseconds. Here is an example of how this method should be used: + + # A single line in a runtime profile used for example purposes. + runtime_profile = "- ExampleTimer: (Avg: 100.000ms ; " \ + "Min: 100.000ms ; " \ + "Max: 100.000ms ; " \ + "Number of samples: 6)" + summary_stats = get_bytes_summary_stats_counter("ExampleTimer", + runtime_profile) + assert len(summary_stats) == 1 + assert summary_stats[0].sum == summary_stats[0].min_value == \ + summary_stats[0].max_value == 100000000 and \ + summary_stats[0].total_num_values == 6 + """ + # This requires the Thrift definitions to be generated. We limit the scope of the import + # to allow tools like the stress test to import this file without building Impala. + from RuntimeProfile.ttypes import TSummaryStatsCounter + + regex_summary_stat = re.compile(r"""\( + Avg:\s(?P<avg>.*)\s;\s # Matches Avg: ? ; + Min:\s(?P<min>.*)\s;\s # Matches Min: ? ; + Max:\s(?P<max>.*)\s;\s # Matches Max: ? ; + Number\sof\ssamples:\s(?P<samples>[0-9]+)\) # Matches Number of samples: ?)""", + re.VERBOSE) + + summary_stats = [] + for counter in re.findall(counter_name + ".*", runtime_profile): + summary_stat = re.search(regex_summary_stat, counter) + # We need to special-case when the counter has not been updated at all because empty + # summary counters have a different format than updated ones. + if not summary_stat: + assert "0ns (Number of samples: 0)" in counter + summary_stats.append(TSummaryStatsCounter(sum=0, total_num_values=0, min_value=0, + max_value=0)) + else: + summary_stat = summary_stat.groupdict() + num_samples = int(summary_stat['samples']) + summary_stats.append(TSummaryStatsCounter(total_num_values=num_samples, + sum=num_samples * parse_duration_string_ns(summary_stat['avg']), + min_value=parse_duration_string_ns(summary_stat['min']), + max_value=parse_duration_string_ns(summary_stat['max']))) + + return summary_stats