This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 21f5a7a6e IMPALA-10180: Add summary stats for client fetch wait time
21f5a7a6e is described below

commit 21f5a7a6e5c7572c6eebe6a5630a93f6f73a4404
Author: Joe McDonnell <joemcdonn...@cloudera.com>
AuthorDate: Sat May 13 10:48:30 2023 -0700

    IMPALA-10180: Add summary stats for client fetch wait time
    
    This adds ClientFetchWaitTimeStats to the runtime profile
    to track the min/max/# of samples for ClientFetchWaitTimer.
    Here is some sample output:
    - ClientFetchWaitTimeStats: (Avg: 161.554ms ; Min: 101.411ms ; Max: 
461.728ms ; Number of samples: 6)
    - ClientFetchWaitTimer: 969.326ms
    
    This also fixes the definition of ClientFetchWaitTimer to avoid
    including time after end of fetch. When the client is closing
    the query, Finalize() gets called. The Finalize() call should
    only add extra client wait time if fetch has not completed.
    
    Testing:
     - Added test cases in query_test/test_fetch.py with specific
       numbers of fetches and verification of the statistics.
     - The test cases make use of a new function for parsing
       summary stats for timers, and this also gets its own test
       case.
    
    Change-Id: I9ca525285e03c7b51b04ac292f7b3531e6178218
    Reviewed-on: http://gerrit.cloudera.org:8080/19897
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <csringho...@cloudera.com>
---
 be/src/service/client-request-state.cc | 21 ++++++++++-
 be/src/service/client-request-state.h  |  4 ++
 tests/infra/test_utils.py              | 21 ++++++++++-
 tests/query_test/test_fetch.py         | 69 +++++++++++++++++++++++++++++++++-
 tests/util/parse_util.py               | 68 +++++++++++++++++++++++++++++++++
 5 files changed, 180 insertions(+), 3 deletions(-)

diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 77d0788ee..a27248db5 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -131,6 +131,8 @@ ClientRequestState::ClientRequestState(const TQueryCtx& 
query_ctx, Frontend* fro
   num_rows_fetched_from_cache_counter_ =
       ADD_COUNTER(server_profile_, "NumRowsFetchedFromCache", TUnit::UNIT);
   client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
+  client_wait_time_stats_ =
+      ADD_SUMMARY_STATS_TIMER(server_profile_, "ClientFetchWaitTimeStats");
   bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND;
   // "Impala Backend Timeline" was specifically chosen to exploit the 
lexicographical
   // ordering defined by the underlying std::map holding the timelines 
displayed in
@@ -1691,7 +1693,24 @@ void ClientRequestState::MarkInactive() {
 void ClientRequestState::MarkActive() {
   client_wait_sw_.Stop();
   int64_t elapsed_time = client_wait_sw_.ElapsedTime();
-  client_wait_timer_->Set(elapsed_time);
+  // If we have reached eos, then the query is already complete,
+  // and we should not accumulate more client wait time. This mostly
+  // impacts the finalization step, where the client is closing the
+  // query and does not need any more rows. Fetching may have already
+  // completed prior to this point, so finalization time should not
+  // count in that case. If the fetch was incomplete, then the client
+  // time should be counted for finalization as well.
+  if (!eos()) {
+    client_wait_timer_->Set(elapsed_time);
+    // The first call is before any MarkInactive() call has run and produces
+    // a zero-length sample. Skip this zero-length sample (but not any later
+    // zero-length samples).
+    if (elapsed_time != 0 || last_client_wait_time_ != 0) {
+      int64_t current_wait_time = elapsed_time - last_client_wait_time_;
+      client_wait_time_stats_->UpdateCounter(current_wait_time);
+    }
+    last_client_wait_time_ = elapsed_time;
+  }
   lock_guard<mutex> l(expiration_data_lock_);
   last_active_time_ms_ = UnixMillis();
   ++ref_count_;
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 93a38c375..a6f580082 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -589,9 +589,13 @@ class ClientRequestState {
   RuntimeProfile::Counter* row_materialization_rate_;
 
   /// Tracks how long we are idle waiting for a client to fetch rows.
+  /// Keep summary statistics to get a sense for the number of fetch calls and
+  /// the typical round-trip time.
   RuntimeProfile::Counter* client_wait_timer_;
+  RuntimeProfile::SummaryStatsCounter* client_wait_time_stats_;
   /// Timer to track idle time for the above counter.
   MonotonicStopWatch client_wait_sw_;
+  int64_t last_client_wait_time_ = 0;
 
   RuntimeProfile::EventSequence* query_events_;
 
diff --git a/tests/infra/test_utils.py b/tests/infra/test_utils.py
index 897cd90c7..8c93770cf 100644
--- a/tests/infra/test_utils.py
+++ b/tests/infra/test_utils.py
@@ -19,7 +19,8 @@
 
 from __future__ import absolute_import, division, print_function
 from tests.util.filesystem_utils import prepend_with_fs
-from tests.util.parse_util import get_bytes_summary_stats_counter
+from tests.util.parse_util import get_bytes_summary_stats_counter, \
+    get_time_summary_stats_counter
 
 
 def test_filesystem_utils():
@@ -47,3 +48,21 @@ def test_get_bytes_summary_stats_counter():
   assert len(summary_stats) == 1
   assert summary_stats[0].sum == 32768 and summary_stats[0].min_value == 6144 
and \
          summary_stats[0].max_value == 10240 and 
summary_stats[0].total_num_values == 4
+
+
+def test_get_time_summary_stats_counter():
+  """Test get_time_summary_stats_counter(counter_name, runtime_profile) using 
a dummy
+     runtime profile.
+  """
+  # This is constructed to test the parsing logic for timestamps, so the 
number don't
+  # add up.
+  runtime_profile = "- ExampleTimeStats: (Avg: 161.554ms ; " \
+                    "Min: 101.411us ; " \
+                    "Max: 1h2m3s4ms5us6ns ; " \
+                    "Number of samples: 6)"
+  summary_stats = get_time_summary_stats_counter("ExampleTimeStats", 
runtime_profile)
+  assert len(summary_stats) == 1
+  assert summary_stats[0].sum == 969324000
+  assert summary_stats[0].min_value == 101411
+  assert summary_stats[0].max_value == 3723004005006
+  assert summary_stats[0].total_num_values == 6
diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py
index 2b86a77fc..16e92a042 100644
--- a/tests/query_test/test_fetch.py
+++ b/tests/query_test/test_fetch.py
@@ -21,7 +21,8 @@ import re
 from time import sleep
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import extend_exec_option_dimension
-from tests.util.parse_util import parse_duration_string_ms
+from tests.util.parse_util import parse_duration_string_ms, \
+    get_time_summary_stats_counter
 
 
 class TestFetch(ImpalaTestSuite):
@@ -68,6 +69,72 @@ class TestFetch(ImpalaTestSuite):
     finally:
       self.client.close_query(handle)
 
+  def test_client_fetch_time_stats(self, vector):
+    num_rows = 27
+    query = "select sleep(10) from functional.alltypes limit 
{0}".format(num_rows)
+    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    try:
+      # Wait until the query is 'FINISHED' and results are available for 
fetching.
+      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
+
+      # This loop will do 6 fetches that contain data and a final fetch with
+      # no data. The last fetch is after eos has been set, so it does not 
count.
+      rows_fetched = 0
+      while True:
+        result = self.client.fetch(query, handle, max_rows=5)
+        assert result.success
+        rows_fetched += len(result.data)
+        # If no rows are returned, we are done.
+        if len(result.data) == 0:
+          break
+        sleep(0.1)
+
+      # After fetching all rows, sleep before closing the query. This should 
not
+      # count as client wait time, because the query is already done.
+      sleep(2.5)
+    finally:
+      self.client.close_query(handle)
+
+    runtime_profile = self.client.get_runtime_profile(handle)
+    summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats",
+                                                   runtime_profile)
+    assert len(summary_stats) == 1
+    assert summary_stats[0].total_num_values == 6
+    # The 2.5 second sleep should not count, so the max must be less than 2.5 
seconds.
+    assert summary_stats[0].max_value < 2500000000
+    assert summary_stats[0].min_value > 0
+
+  def test_client_fetch_time_stats_incomplete(self, vector):
+    num_rows = 27
+    query = "select sleep(10) from functional.alltypes limit 
{0}".format(num_rows)
+    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    try:
+      # Wait until the query is 'FINISHED' and results are available for 
fetching.
+      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
+
+      # This loop will do 5 fetches for a total of 25 rows. This is incomplete.
+      for i in range(5):
+        result = self.client.fetch(query, handle, max_rows=5)
+        assert result.success
+        sleep(0.1)
+
+      # Sleep before closing the query. For an incomplete fetch, this still 
counts
+      # towards the query time, so this does show up in the counters.
+      sleep(2.5)
+    finally:
+      self.client.close_query(handle)
+
+    runtime_profile = self.client.get_runtime_profile(handle)
+
+    summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats",
+                                                   runtime_profile)
+    assert len(summary_stats) == 1
+    # There are 5 fetches and the finalization sample for a total of 6.
+    assert summary_stats[0].total_num_values == 6
+    # The 2.5 second sleep does count for an incomplete fetch, verify the max 
is higher.
+    assert summary_stats[0].max_value >= 2500000000
+    assert summary_stats[0].min_value > 0
+
 
 class TestFetchAndSpooling(ImpalaTestSuite):
   """Tests that apply when result spooling is enabled or disabled."""
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 16c6fb220..bceb6ddf8 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -102,6 +102,23 @@ def parse_duration_string_ms(duration):
   return (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000 + 
times['ms']
 
 
+def parse_duration_string_ns(duration):
+  """Parses a duration string of the form 1h2h3m4s5.6ms4.5us7.8ns into 
nanoseconds."""
+  pattern = r'(?P<value>[0-9]+\.?[0-9]*?)(?P<units>\D+)'
+  matches = list(re.finditer(pattern, duration))
+  assert matches, 'Failed to parse duration string %s' % duration
+
+  times = {'h': 0, 'm': 0, 's': 0, 'ms': 0, 'us': 0, 'ns': 0}
+  for match in matches:
+    parsed = match.groupdict()
+    times[parsed['units']] = float(parsed['value'])
+
+  value_ns = (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000000000
+  value_ns += times['ms'] * 1000000 + times['us'] * 1000 + times['ns']
+
+  return value_ns
+
+
 def get_duration_us_from_str(duration_str):
   """Parses the duration string got in profile and returns the duration in 
us"""
   match_res = re.search(r"\((\d+) us\)", duration_str)
@@ -189,3 +206,54 @@ def get_bytes_summary_stats_counter(counter_name, 
runtime_profile):
           min_value=int(summary_stat['min']), 
max_value=int(summary_stat['max'])))
 
   return summary_stats
+
+
+def get_time_summary_stats_counter(counter_name, runtime_profile):
+  """Extracts a list of TSummaryStatsCounters from a given runtime profile 
where the
+     units are time. Each entry in the returned list corresponds to a single 
occurrence
+     of the counter in the profile. If the counter is present, but it has not 
been
+     updated, an empty TSummaryStatsCounter is returned for that entry. If the 
counter
+     is not in the given profile, an empty list is returned. All time values 
are returned
+     as nanoseconds. Here is an example of how this method should be used:
+
+       # A single line in a runtime profile used for example purposes.
+       runtime_profile = "- ExampleTimer: (Avg: 100.000ms ; " \
+                                          "Min: 100.000ms ; " \
+                                          "Max: 100.000ms ; " \
+                                          "Number of samples: 6)"
+       summary_stats = get_bytes_summary_stats_counter("ExampleTimer",
+                                                      runtime_profile)
+       assert len(summary_stats) == 1
+       assert summary_stats[0].sum == summary_stats[0].min_value == \
+              summary_stats[0].max_value == 100000000 and \
+              summary_stats[0].total_num_values == 6
+  """
+  # This requires the Thrift definitions to be generated. We limit the scope 
of the import
+  # to allow tools like the stress test to import this file without building 
Impala.
+  from RuntimeProfile.ttypes import TSummaryStatsCounter
+
+  regex_summary_stat = re.compile(r"""\(
+    Avg:\s(?P<avg>.*)\s;\s # Matches Avg: ? ;
+    Min:\s(?P<min>.*)\s;\s # Matches Min: ? ;
+    Max:\s(?P<max>.*)\s;\s # Matches Max: ? ;
+    Number\sof\ssamples:\s(?P<samples>[0-9]+)\) # Matches Number of samples: 
?)""",
+                                  re.VERBOSE)
+
+  summary_stats = []
+  for counter in re.findall(counter_name + ".*", runtime_profile):
+    summary_stat = re.search(regex_summary_stat, counter)
+    # We need to special-case when the counter has not been updated at all 
because empty
+    # summary counters have a different format than updated ones.
+    if not summary_stat:
+      assert "0ns (Number of samples: 0)" in counter
+      summary_stats.append(TSummaryStatsCounter(sum=0, total_num_values=0, 
min_value=0,
+                                                max_value=0))
+    else:
+      summary_stat = summary_stat.groupdict()
+      num_samples = int(summary_stat['samples'])
+      summary_stats.append(TSummaryStatsCounter(total_num_values=num_samples,
+          sum=num_samples * parse_duration_string_ns(summary_stat['avg']),
+          min_value=parse_duration_string_ns(summary_stat['min']),
+          max_value=parse_duration_string_ns(summary_stat['max'])))
+
+  return summary_stats

Reply via email to