This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch branch-4.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b342710f574a0b49a123f774137cf5298844fbac
Author: Michael Smith <michael.sm...@cloudera.com>
AuthorDate: Tue Apr 9 13:19:19 2024 -0700

    IMPALA-12963: Return parent PID when children spawned
    
    Returns the original PID for a command rather than any children that may
    be active. This happens during graceful shutdown in UBSAN tests. Also
    updates 'kill' to use the version of 'get_pid' that logs details to help
    with debugging.
    
    Moves try block in test_query_log.py to after client2 has been
    initialized. Removes 'drop table' on unique_database, since test suite
    already handles cleanup.
    
    Change-Id: I214e79507c717340863d27f68f6ea54c169e4090
    Reviewed-on: http://gerrit.cloudera.org:8080/21278
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 tests/common/impala_cluster.py         | 54 ++++++++++++++++----------
 tests/custom_cluster/test_query_log.py | 69 ++++++++++++++++------------------
 2 files changed, 67 insertions(+), 56 deletions(-)

diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 51fefa715..1465ab31b 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -401,7 +401,7 @@ class Process(object):
     """Gets the PIDs of the process. In some circumstances, a process can run 
multiple
     times, e.g. when it forks in the Breakpad crash handler. Returns an empty 
list if no
     PIDs can be determined."""
-    pids = self.__get_pids()
+    pids = [proc['pid'] for proc in self.__get_procs()]
     if pids:
       LOG.info("Found PIDs %s for %s" % (", ".join(map(str, pids)), " 
".join(self.cmd)))
     else:
@@ -409,37 +409,53 @@ class Process(object):
                " ".join(self.cmd))
     return pids
 
-  def __get_pid(self):
-    pids = self.__get_pids()
-    assert len(pids) < 2, "Expected single pid but found %s" % ", 
".join(map(str, pids))
-    return len(pids) == 1 and pids[0] or None
+  def __procs_str(self, procs):
+    return "\n".join([str(proc) for proc in procs])
 
-  def __get_pids(self):
+  def __get_pid(self):
+    procs = self.__get_procs()
+    # Return early for containerized environments
+    if len(procs) == 1:
+      return procs[0]['pid']
+
+    result = None
+    # In some circumstances - notably ubsan tests - child processes can be 
slow to exit.
+    # Only return the original process, i.e. one who's parent has a different 
cmd.
+    pids = [proc['pid'] for proc in procs]
+    for process in procs:
+      if process['ppid'] not in pids:
+        assert result is None,\
+            "Multiple non-child processes:\n%s" % self.__procs_str(procs)
+        result = process['pid']
+      else:
+        LOG.info("Child process active:\n%s" % self.__procs_str(procs))
+
+    return result
+
+  def __get_procs(self):
+    """
+    Returns a list of dicts containing {pid, ppid, cmdline} for related 
processes.
+    """
     if self.container_id is not None:
       container_info = get_container_info(self.container_id)
       if container_info["State"]["Status"] != "running":
         return []
-      return [container_info["State"]["Pid"]]
+      return [{'pid': container_info["State"]["Pid"], 'ppid': 0, 'cmdline': 
self.cmd}]
 
     # In non-containerised case, search for process based on matching command 
lines.
-    pids = []
-    for pid in psutil.pids():
-      try:
-        process = psutil.Process(pid)
-        if set(self.cmd) == set(process.cmdline()):
-          pids.append(pid)
-      except psutil.NoSuchProcess:
-        # A process from psutil.pids() no longer exists, continue. We don't 
log this
-        # error since it can refer to arbitrary processes outside of our 
testing code.
-        pass
-    return pids
+    procs = []
+    for process in psutil.process_iter(['pid', 'ppid', 'cmdline']):
+      # Use info because it won't throw NoSuchProcess exceptions.
+      if set(self.cmd) == set(process.info['cmdline']):
+        procs.append(process.info)
+    return procs
 
   def kill(self, signal=SIGKILL):
     """
     Kills the given processes.
     """
     if self.container_id is None:
-      pid = self.__get_pid()
+      pid = self.get_pid()
       assert pid is not None, "No processes for %s" % self
       LOG.info('Killing %s with signal %s' % (self, signal))
       exec_process("kill -%d %d" % (signal, pid))
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index e02993217..dd61e7291 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -56,13 +56,11 @@ class TestQueryLogTableBase(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol',
         cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2))
 
-  def get_client(self, protocol, query_table_name=""):
+  def get_client(self, protocol, query_table_name=QUERY_TBL):
     """Retrieves the default Impala client for the specified protocol. This 
client is
        automatically closed after the test completes. Also ensures the 
completed queries
        table has been successfully created by checking the logs to verify the 
create
        table sql has finished."""
-    if query_table_name == "":
-      query_table_name = self.QUERY_TBL
 
     # These tests run very quickly and can actually complete before Impala has 
finished
     # creating the completed queries table. Thus, to make these tests more 
robust, this
@@ -704,24 +702,24 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     impalad = self.cluster.get_first_impalad()
     client = self.get_client(vector.get_value('protocol'))
 
-    try:
-      # Execute sql statements to ensure all get written to the query log 
table.
-      sql1 = client.execute("select 1")
-      assert sql1.success
+    # Execute sql statements to ensure all get written to the query log table.
+    sql1 = client.execute("select 1")
+    assert sql1.success
 
-      sql2 = client.execute("select 2")
-      assert sql2.success
+    sql2 = client.execute("select 2")
+    assert sql2.success
 
-      sql3 = client.execute("select 3")
-      assert sql3.success
+    sql3 = client.execute("select 3")
+    assert sql3.success
 
-      
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
-          60)
+    
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
+        60)
 
-      impalad.kill_and_wait_for_exit(SIGRTMIN)
+    impalad.kill_and_wait_for_exit(SIGRTMIN)
 
-      client2 = self.create_client_for_nth_impalad(1, 
vector.get_value('protocol'))
+    client2 = self.create_client_for_nth_impalad(1, 
vector.get_value('protocol'))
 
+    try:
       def assert_func(last_iteration):
         results = client2.execute("select query_id,sql from {0} where query_id 
in "
                                   "('{1}','{2}','{3}')".format(self.QUERY_TBL,
@@ -783,30 +781,27 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
     tbl_name = "{0}.{1}".format(unique_database, unique_name)
     client = self.get_client(vector.get_value('protocol'))
 
-    try:
-      # Create the test table.
-      create_tbl_sql = "create table {0} (id INT, product_name STRING) " \
-        "partitioned by (category INT)".format(tbl_name)
-      create_tbl_results = client.execute(create_tbl_sql)
-      assert create_tbl_results.success
-
-      insert_sql = "insert into {0} (id,category,product_name) values " \
-                   "(0,1,'the product')".format(tbl_name)
-      res = client.execute(insert_sql, fetch_profile_after_close=True)
-      assert res.success
+    # Create the test table.
+    create_tbl_sql = "create table {0} (id INT, product_name STRING) " \
+      "partitioned by (category INT)".format(tbl_name)
+    create_tbl_results = client.execute(create_tbl_sql)
+    assert create_tbl_results.success
 
-      # Include the two queries run by the unique_database fixture setup.
-      self.cluster.get_first_impalad().service.wait_for_metric_value(
-          "impala-server.completed-queries.written", 4, 60)
-
-      client2 = self.create_client_for_nth_impalad(2, 
vector.get_value('protocol'))
-      try:
-        assert client2 is not None
-        assert_query(self.QUERY_TBL, client2, "test_query_hist_3", 
res.runtime_profile)
-      finally:
-        client2.close()
+    insert_sql = "insert into {0} (id,category,product_name) values " \
+                  "(0,1,'the product')".format(tbl_name)
+    res = client.execute(insert_sql, fetch_profile_after_close=True)
+    assert res.success
+
+    # Include the two queries run by the unique_database fixture setup.
+    self.cluster.get_first_impalad().service.wait_for_metric_value(
+        "impala-server.completed-queries.written", 4, 60)
+
+    client2 = self.create_client_for_nth_impalad(2, 
vector.get_value('protocol'))
+    try:
+      assert client2 is not None
+      assert_query(self.QUERY_TBL, client2, "test_query_hist_3", 
res.runtime_profile)
     finally:
-      client.execute("drop table if exists {0}".format(tbl_name))
+      client2.close()
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "

Reply via email to