On Mon, Mar 18, 2019 at 7:33 PM Julien Rouhaud <[email protected]> wrote:
>
> On Mon, Mar 18, 2019 at 6:23 PM Yun Li <[email protected]> wrote:
> >
> > Let's take one step back. Since queryId is stored in core as Julien pointed
> > out, can we just add that global to the pg_stat_get_activity and ultimately
> > exposed in pg_stat_activity view? Then no matter whether PGSS is on or
> > off, or however the customer extensions are updating that filed, we expose
> > that field in that view then enable user to leverage that id to join with
> > pgss or their extension. Will this sounds a good idea?
>
> I'd greatly welcome expose queryid exposure in pg_stat_activity, and
> also in log_line_prefix. I'm afraid that it's too late for pg12
> inclusion, but I'll be happy to provide a patch for that for pg13.
Here's a prototype patch for queryid exposure in pg_stat_activity and
log_line prefix.
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d383de2512..37570825be 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6260,6 +6260,11 @@ local0.* /var/log/postgresql
session processes</entry>
<entry>no</entry>
</row>
+ <row>
+ <entry><literal>%Q</literal></entry>
+ <entry>queryid: identifier of session's current query, if any</entry>
+ <entry>yes</entry>
+ </row>
<row>
<entry><literal>%%</literal></entry>
<entry>Literal <literal>%</literal></entry>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index ac2721c8ad..726c9430d5 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -800,6 +800,19 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><type>xid</type></entry>
<entry>The current backend's <literal>xmin</literal> horizon.</entry>
</row>
+ <row>
+ <entry><structfield>queryid</structfield></entry>
+ <entry><type>bigint</type></entry>
+ <entry>Identifier this backend's most recent query. If
+ <structfield>state</structfield> is <literal>active</literal> this field
+ shows the identifier of the currently executing query. In all other
+ states, it shows the identifier of last query that was executed, unless
+ an error occured which will reset this field to 0. By default, query
+ identifiers are not computed, so this field will always display 0, unless
+ an additional module that compute query identifiers, such as <xref
+ linkend="pgstatstatements"/>, is configured.
+ </entry>
+ </row>
<row>
<entry><structfield>query</structfield></entry>
<entry><type>text</type></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index d962648bc5..6b62c7db1c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -708,6 +708,7 @@ CREATE VIEW pg_stat_activity AS
S.state,
S.backend_xid,
s.backend_xmin,
+ S.queryid,
S.query,
S.backend_type
FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 63a34760ee..955722d3a4 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -140,6 +140,8 @@ static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
void
ExecutorStart(QueryDesc *queryDesc, int eflags)
{
+ pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
if (ExecutorStart_hook)
(*ExecutorStart_hook) (queryDesc, eflags);
else
@@ -300,6 +302,8 @@ ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count,
bool execute_once)
{
+ pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
if (ExecutorRun_hook)
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
else
@@ -399,6 +403,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
void
ExecutorFinish(QueryDesc *queryDesc)
{
+ pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
if (ExecutorFinish_hook)
(*ExecutorFinish_hook) (queryDesc);
else
@@ -459,6 +465,8 @@ standard_ExecutorFinish(QueryDesc *queryDesc)
void
ExecutorEnd(QueryDesc *queryDesc)
{
+ pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
if (ExecutorEnd_hook)
(*ExecutorEnd_hook) (queryDesc);
else
@@ -538,6 +546,8 @@ ExecutorRewind(QueryDesc *queryDesc)
/* It's probably not sensible to rescan updating queries */
Assert(queryDesc->operation == CMD_SELECT);
+ pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
/*
* Switch into per-query memory context
*/
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index d898f4ca78..0729c2f1a3 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -24,6 +24,7 @@
#include "executor/executor.h"
#include "executor/spi_priv.h"
#include "miscadmin.h"
+#include "storage/proc.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@@ -1879,6 +1880,7 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
List *plancache_list;
ListCell *list_item;
ErrorContextCallback spierrcontext;
+ uint64 old_queryId = pg_atomic_read_u64(&MyProc->queryId);
/*
* Setup error traceback support for ereport()
@@ -1935,6 +1937,8 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
_SPI_current->queryEnv);
}
+ pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
/* Finish filling in the CachedPlanSource */
CompleteCachedPlan(plansource,
stmt_list,
@@ -2046,6 +2050,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
int res = 0;
bool pushed_active_snap = false;
ErrorContextCallback spierrcontext;
+ uint64 old_queryId = pg_atomic_read_u64(&MyProc->queryId);
CachedPlan *cplan = NULL;
ListCell *lc1;
@@ -2135,6 +2140,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
_SPI_current->queryEnv);
}
+ pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
/* Finish filling in the CachedPlanSource */
CompleteCachedPlan(plansource,
stmt_list,
@@ -2305,6 +2312,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
}
}
+ pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
/*
* The last canSetTag query sets the status values returned to the
* caller. Be careful to free any tuptables not returned, to
@@ -2408,6 +2417,7 @@ static int
_SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
{
int operation = queryDesc->operation;
+ uint64 old_queryId = pg_atomic_read_u64(&MyProc->queryId);
int eflags;
int res;
@@ -2472,6 +2482,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
ExecutorEnd(queryDesc);
/* FreeQueryDesc is done by the caller */
+ pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
#ifdef SPI_EXECUTOR_STATS
if (ShowExecutorStats)
ShowUsage("SPI EXECUTOR STATS");
@@ -2519,6 +2531,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
DestReceiver *dest)
{
uint64 nfetched;
+ uint64 old_queryId = pg_atomic_read_u64(&MyProc->queryId);
/* Check that the portal is valid */
if (!PortalIsValid(portal))
@@ -2553,6 +2566,8 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
if (dest->mydest == DestSPI && _SPI_checktuples())
elog(ERROR, "consistency check on SPI tuple count failed");
+ pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
/* Put the result into place for access by caller */
SPI_processed = _SPI_current->processed;
SPI_tuptable = _SPI_current->tuptable;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index d6cdd16607..9ee0d72746 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -44,6 +44,7 @@
#include "parser/parse_target.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteManip.h"
+#include "storage/proc.h"
#include "utils/rel.h"
@@ -118,6 +119,8 @@ parse_analyze(RawStmt *parseTree, const char *sourceText,
if (post_parse_analyze_hook)
(*post_parse_analyze_hook) (pstate, query);
+ pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
free_parsestate(pstate);
return query;
@@ -151,6 +154,8 @@ parse_analyze_varparams(RawStmt *parseTree, const char *sourceText,
if (post_parse_analyze_hook)
(*post_parse_analyze_hook) (pstate, query);
+ pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
free_parsestate(pstate);
return query;
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 0da5b19719..4693597aa1 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -284,6 +284,7 @@ InitProcGlobal(void)
*/
pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO);
pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO);
+ pg_atomic_init_u64(&(procs[i].queryId), 0);
}
/*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f9ce3d8f22..73b92d243f 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -744,6 +744,8 @@ pg_analyze_and_rewrite_params(RawStmt *parsetree,
if (post_parse_analyze_hook)
(*post_parse_analyze_hook) (pstate, query);
+ pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
free_parsestate(pstate);
if (log_parser_stats)
@@ -4027,6 +4029,12 @@ PostgresMain(int argc, char *argv[],
*/
debug_query_string = NULL;
+ /*
+ * Also reset the queryId, as any new error encountered before a
+ * specific query is executed isn't linked to the last saved value
+ */
+ pg_atomic_write_u64(&MyProc->queryId, 0);
+
/*
* Abort the current transaction in order to recover.
*/
@@ -4106,6 +4114,12 @@ PostgresMain(int argc, char *argv[],
*/
doing_extended_query_message = false;
+ /*
+ * Also reset the queryId, so any error encountered before a specific
+ * query is executed won't display the last saved value
+ */
+ pg_atomic_write_u64(&MyProc->queryId, 0);
+
/*
* Release storage left over from prior query cycle, and create a new
* query input buffer in the cleared MessageContext.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index da1d685c08..b8ba5819d2 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -541,7 +541,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
Datum
pg_stat_get_activity(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_ACTIVITY_COLS 26
+#define PG_STAT_GET_ACTIVITY_COLS 27
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -855,6 +855,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
values[18] = BoolGetDatum(false); /* ssl */
nulls[19] = nulls[20] = nulls[21] = nulls[22] = nulls[23] = nulls[24] = nulls[25] = true;
}
+ values[26] = DatumGetUInt64(pg_atomic_read_u64(&proc->queryId));
}
else
{
@@ -879,6 +880,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
nulls[23] = true;
nulls[24] = true;
nulls[25] = true;
+ nulls[26] = true;
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 8b4720ef3a..8e611bd239 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -2594,6 +2594,20 @@ log_line_prefix(StringInfo buf, ErrorData *edata)
else
appendStringInfoString(buf, unpack_sql_state(edata->sqlerrcode));
break;
+ case 'Q':
+ if (MyProc != NULL)
+ {
+ if (padding != 0)
+ appendStringInfo(buf, "%*ld", padding,
+ pg_atomic_read_u64(&MyProc->queryId));
+ else
+ appendStringInfo(buf, "%ld",
+ pg_atomic_read_u64(&MyProc->queryId));
+ }
+ else if (padding != 0)
+ appendStringInfoSpaces(buf,
+ padding > 0 ? padding : -padding);
+ break;
default:
/* format error - ignore it */
break;
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index cccb5f145a..1c3efdff9c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -515,6 +515,7 @@
# %t = timestamp without milliseconds
# %m = timestamp with milliseconds
# %n = timestamp with milliseconds (as a Unix epoch)
+ # %Q = query ID (0 if none or not computed)
# %i = command tag
# %e = SQL state
# %c = session ID
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 84120de362..d796e4905d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5089,9 +5089,10 @@
proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'int4',
- proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn}',
+ proallargtypes =>
+ '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,int8}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,queryid}',
prosrc => 'pg_stat_get_activity' },
{ oid => '3318',
descr => 'statistics: information about progress of backends running maintenance command',
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1cee7db89d..8e3a6ae9ca 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -173,6 +173,7 @@ struct PGPROC
*/
TransactionId procArrayGroupMemberXid;
+ pg_atomic_uint64 queryId; /* current queryid if any */
uint32 wait_event_info; /* proc's wait information */
/* Support for group transaction status update. */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f104dc4a62..b10c827507 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1737,9 +1737,10 @@ pg_stat_activity| SELECT s.datid,
s.state,
s.backend_xid,
s.backend_xmin,
+ s.queryid,
s.query,
s.backend_type
- FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn)
+ FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, queryid)
LEFT JOIN pg_database d ON ((s.datid = d.oid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1872,7 +1873,7 @@ pg_stat_replication| SELECT s.pid,
w.sync_priority,
w.sync_state,
w.reply_time
- FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn)
+ FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, queryid)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid,
@@ -1884,7 +1885,7 @@ pg_stat_ssl| SELECT s.pid,
s.ssl_client_dn AS client_dn,
s.ssl_client_serial AS client_serial,
s.ssl_issuer_dn AS issuer_dn
- FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn);
+ FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, queryid);
pg_stat_subscription| SELECT su.oid AS subid,
su.subname,
st.pid,