On Sat, Jan 18, 2020 at 3:51 AM Michael Paquier <mich...@paquier.xyz> wrote:
>
> On Fri, Jan 17, 2020 at 05:07:55PM +0100, Julien Rouhaud wrote:
> > Oh indeed.  But unless we hold some LWLock during the whole function
> > execution, we cannot guarantee a consistent view right?
>
> Yep.  That's possible.
>
> > And isn't it already possible to e.g. see a parallel worker in
> > pg_stat_activity while all other queries are shown are idle, if
> > you're unlucky enough?
>
> Yep.  That's possible.
>
> > Also, LockHashPartitionLockByProc requires the leader PGPROC, and
> > there's no guarantee that we'll see the leader before any of the
> > workers, so I'm unsure how to implement what you said.  Wouldn't it be
> > better to simply fetch the leader PGPROC after acquiring a shared
> > ProcArrayLock, and using that copy to display the pid, after checking
> > that we retrieved a non-null PGPROC?
>
> Another idea would be to check if the current PGPROC entry is a leader
> and print in an int[] the list of PIDs of all the workers while
> holding a shared LWLock to avoid anything to be unregistered.  Less
> handy, but a bit more consistent.  One problem with doing that is
> that you may have in your list of PIDs some worker processes that are
> already gone when going through all the other backend entries.  An
> advantage is that an empty array could mean "I am the leader, but
> nothing has been registered yet to my group lock." (note that the
> leader adds itself to lockGroupMembers).

So, AFAICT the LockHashPartitionLockByProc is required when
iterating/modifying lockGroupMembers or lockGroupLink, but just
getting the leader pid should be safe.  Since we'll never be able to
get a totally consistent view of data here, I'm in favor of avoiding
taking extra locks here.  I agree that outputting an array of the pid
would be more consistent for the leader, but will have its own set of
corner cases.  It seems to me that a new leader_pid column is easier
to handle at SQL level, so I kept that approach in attached v4.  If
you have strong objections to it. I can still change it.
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8839699079..2dbcb0f9d0 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -622,6 +622,17 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
      <entry><type>integer</type></entry>
      <entry>Process ID of this backend</entry>
     </row>
+    <row>
+     <entry><structfield>leader_pid</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>
+      The leader_pid is NULL for processes not involved in parallel query.
+      When a process wants to cooperate with parallel workers, it becomes a
+      parallel group leader, which means that this field will be valued to its
+      own pid. When a parallel worker starts up, this field will be valued with
+      the parallel group leader pid.
+     </entry>
+    </row>
     <row>
      <entry><structfield>usesysid</structfield></entry>
      <entry><type>oid</type></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c9e75f4370..d7cc14d175 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -740,6 +740,7 @@ CREATE VIEW pg_stat_activity AS
             S.datid AS datid,
             D.datname AS datname,
             S.pid,
+            S.leader_pid,
             S.usesysid,
             U.rolname AS usename,
             S.application_name,
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 74f899f24d..7bff5eaf37 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -547,7 +547,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	29
+#define PG_STAT_GET_ACTIVITY_COLS	30
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -686,33 +686,30 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			values[5] = CStringGetTextDatum(clipped_activity);
 			pfree(clipped_activity);
 
+			nulls[29] = true;
 			proc = BackendPidGetProc(beentry->st_procpid);
+
+			/*
+			 * For an auxiliary process, retrieve process info from
+			 * AuxiliaryProcs stored in shared-memory.
+			 */
+			if (!proc && (beentry->st_backendType != B_BACKEND))
+				proc = AuxiliaryPidGetProc(beentry->st_procpid);
+
 			if (proc != NULL)
 			{
 				uint32		raw_wait_event;
+				PGPROC	   *leader;
 
 				raw_wait_event = UINT32_ACCESS_ONCE(proc->wait_event_info);
 				wait_event_type = pgstat_get_wait_event_type(raw_wait_event);
 				wait_event = pgstat_get_wait_event(raw_wait_event);
 
-			}
-			else if (beentry->st_backendType != B_BACKEND)
-			{
-				/*
-				 * For an auxiliary process, retrieve process info from
-				 * AuxiliaryProcs stored in shared-memory.
-				 */
-				proc = AuxiliaryPidGetProc(beentry->st_procpid);
-
-				if (proc != NULL)
+				leader = proc->lockGroupLeader;
+				if (leader)
 				{
-					uint32		raw_wait_event;
-
-					raw_wait_event =
-						UINT32_ACCESS_ONCE(proc->wait_event_info);
-					wait_event_type =
-						pgstat_get_wait_event_type(raw_wait_event);
-					wait_event = pgstat_get_wait_event(raw_wait_event);
+					values[29] = Int32GetDatum(leader->pid);
+					nulls[29] = false;
 				}
 			}
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bef50c76d9..86d18c76b6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5175,9 +5175,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 70e1e2f78d..bff0c2ff39 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1729,6 +1729,7 @@ pg_shmem_allocations| SELECT pg_get_shmem_allocations.name,
 pg_stat_activity| SELECT s.datid,
     d.datname,
     s.pid,
+    s.leader_pid,
     s.usesysid,
     u.rolname AS usename,
     s.application_name,
@@ -1746,7 +1747,7 @@ pg_stat_activity| SELECT s.datid,
     s.backend_xmin,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1850,7 +1851,7 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
@@ -1983,7 +1984,7 @@ pg_stat_replication| SELECT s.pid,
     w.spill_txns,
     w.spill_count,
     w.spill_bytes
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
@@ -1995,7 +1996,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,

Reply via email to