Hi

I am sending a prototype with basic implementation with explain of running
query specified by pid.

It can show more than the execution plan. There is a examples of full query
text and running completion tag.

This patch is in early stage - I know, so there is one race condition.

I hoped so I can use new shm_mq API, but it is not prepared for usage where
receiver and sender are mutable.

How it works:

postgres=# select pg_cmdstatus(pid,1) from pg_stat_activity where pid <>
pg_backend_pid();

pg_cmdstatus
-------------------------------------------------------------------------------
 Query Text: select * from pg_class, pg_attribute limit 4000000;
 Limit  (cost=0.00..8795.58 rows=697380 width=403)
   ->  Nested Loop  (cost=0.00..8795.58 rows=697380 width=403)
         ->  Seq Scan on pg_attribute  (cost=0.00..66.64 rows=2364
width=203)
         ->  Materialize  (cost=0.00..12.42 rows=295 width=200)
               ->  Seq Scan on pg_class  (cost=0.00..10.95 rows=295
width=200)
(6 rows)

postgres=# select pg_cmdstatus(pid,2) from pg_stat_activity where pid <>
pg_backend_pid();
                    pg_cmdstatus
-----------------------------------------------------
 select * from pg_class, pg_attribute limit 4000000;
(1 row)

postgres=# select pg_cmdstatus(pid,3) from pg_stat_activity where pid <>
pg_backend_pid();
 pg_cmdstatus
---------------
 SELECT 144427
(1 row)

postgres=# select pg_cmdstatus(pid,3) from pg_stat_activity where pid <>
pg_backend_pid();
 pg_cmdstatus
---------------
 SELECT 209742
(1 row)

postgres=# select pg_cmdstatus(pid,3) from pg_stat_activity where pid <>
pg_backend_pid();
 pg_cmdstatus
---------------
 SELECT 288472
(1 row)

In future a function can be replaced by statement EXPLAIN pid WITH
autocomplete - It can show a subset of EXPLAIN ANALYZE -- but it needs a
some parametrization of executor environment.

First discuss to this topic was year ago

http://www.postgresql.org/message-id/cafj8pra-duzkmdtu52ciugb0p7tvri_b8ltjmjfwcnr1lpt...@mail.gmail.com

http://www.postgresql.org/message-id/CAFj8pRDEo24joEg4UFRDYeFADFTw-jw_=t=kPwOyDW=v=g1...@mail.gmail.com

Regards

Pavel
commit 8dfcd8daee84cbe60dbf552073f883751274faf4
Author: Pavel Stehule <pavel.steh...@gooddata.com>
Date:   Fri Sep 5 21:29:06 2014 +0200

    vip prototype - explain and status of another PostgreSQL proces

diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 22f116b..d5ec3ae 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o  \
-	collationcmds.o constraint.o conversioncmds.o copy.o createas.o \
+	cmdstatus.o collationcmds.o constraint.o conversioncmds.o copy.o createas.o \
 	dbcommands.o define.o discard.o dropcmds.o \
 	event_trigger.o explain.o extension.o foreigncmds.o functioncmds.o \
 	indexcmds.o lockcmds.o matview.o operatorcmds.o opclasscmds.o \
diff --git a/src/backend/commands/cmdstatus.c b/src/backend/commands/cmdstatus.c
new file mode 100644
index 0000000..f2d0d9c
--- /dev/null
+++ b/src/backend/commands/cmdstatus.c
@@ -0,0 +1,468 @@
+/*-------------------------------------------------------------------------
+ *
+ * comment.c
+ *
+ * PostgreSQL object comments utility code.
+ *
+ * Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/comment.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/htup_details.h"
+#include "commands/cmdstatus.h"
+#include "commands/explain.h"
+#include "lib/stringinfo.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "tcop/dest.h"
+#include "tcop/pquery.h"
+#include "utils/builtins.h"
+
+
+#define CMDINFO_SLOTS		100
+#define BUFFER_SIZE		(8 * 1024)
+
+typedef struct {
+    bool is_valid;
+    bool is_done;
+    int target_pid;
+    int sender_pid;
+    int request_type;
+    int result_code;
+} CmdStatusInfoEntry;
+
+typedef struct {
+	LWLock		*lock;			/* protect slots - search/modification */
+	CmdStatusInfoEntry *slots;
+	LWLock		*buffer_lock;		/* protect buffer handling */
+	void		*buffer;		/* result data */
+	Size		buffer_size;
+	int		target_pid;
+	int		sender_pid;
+	bool		buffer_is_free;		/* buffer is generally available */
+	bool		buffer_holds_data;	/* buffer holds a valid data */
+} CmdStatusInfo;
+
+static CmdStatusInfo *cmd_status_info = NULL;
+
+/*
+ * Prepare explain of query
+ *
+ */
+static StringInfo
+explain_query(QueryDesc *queryDesc)
+{
+	ExplainState es;
+
+	ExplainInitState(&es);
+	es.analyze = false;
+	es.verbose = false;
+	es.buffers = false;
+	es.format = EXPLAIN_FORMAT_TEXT;
+
+	ExplainBeginOutput(&es);
+	ExplainQueryText(&es, queryDesc);
+	ExplainPrintPlan(&es, queryDesc);
+
+	ExplainEndOutput(&es);
+
+	/* Remove last line break */
+	if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n')
+		es.str->data[--es.str->len] = '\0';
+
+	return es.str;
+}
+
+static CmdStatusInfo *
+attach_shmem(CmdStatusInfo *cmd_status_info)
+{
+	bool	found;
+
+	if (cmd_status_info == NULL)
+	{
+		LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+
+		cmd_status_info = (CmdStatusInfo *) ShmemInitStruct("cmdstatusinfo",
+									sizeof(CmdStatusInfo),
+									&found);
+		if (!found)
+		{
+			int	i;
+
+			cmd_status_info->lock = LWLockAssign();
+			cmd_status_info->slots = ShmemAlloc(CMDINFO_SLOTS * sizeof(CmdStatusInfo));
+
+			for (i = 0; i < CMDINFO_SLOTS; i++)
+			{
+				cmd_status_info->slots[i].is_valid = false;
+			}
+
+			cmd_status_info->buffer_lock = LWLockAssign();
+			cmd_status_info->buffer = ShmemAlloc(BUFFER_SIZE);
+			cmd_status_info->buffer_is_free = true;
+			cmd_status_info->buffer_holds_data = true;
+		}
+
+		LWLockRelease(AddinShmemInitLock);
+	}
+
+	return cmd_status_info;
+}
+
+/*
+ * write data to shm buffer - wait for free buffer
+ *
+ */
+static void
+write_to_shm_buffer(int target_pid, int sender_pid, void *data, Size bytes)
+{
+	int	loop = 0;
+
+	cmd_status_info = attach_shmem(cmd_status_info);
+
+	while (1)
+	{
+		LWLockAcquire(cmd_status_info->buffer_lock, LW_EXCLUSIVE);
+
+		if (cmd_status_info->buffer_is_free)
+		{
+			cmd_status_info->target_pid = target_pid;
+			cmd_status_info->sender_pid = sender_pid;
+			cmd_status_info->buffer_is_free = false;
+			cmd_status_info->buffer_holds_data = true;
+
+			cmd_status_info->buffer_size = bytes;
+			memcpy(cmd_status_info->buffer, data, bytes);
+
+			LWLockRelease(cmd_status_info->buffer_lock);
+			break;
+		}
+		else
+		{
+			LWLockRelease(cmd_status_info->buffer_lock);
+
+			if (loop++ % 100 == 0);
+				CHECK_FOR_INTERRUPTS();
+
+			if (loop > 100000)
+				elog(ERROR, "cannot to take buffer to send data");
+
+			pg_usleep(1000L);
+		}
+	}
+}
+
+/*
+ * It read data from shm buffer, waits for data
+ *
+ */
+static void *
+read_from_shm_buffer(int target_pid, int sender_pid, Size *bytes)
+{
+	void *result = NULL;
+	int	loop = 0;
+
+	cmd_status_info = attach_shmem(cmd_status_info);
+
+	while (1)
+	{
+		LWLockAcquire(cmd_status_info->buffer_lock, LW_EXCLUSIVE);
+
+		if (cmd_status_info->buffer_holds_data && 
+				cmd_status_info->target_pid == target_pid &&
+				cmd_status_info->sender_pid == sender_pid)
+		{
+			result = palloc(cmd_status_info->buffer_size);
+			memcpy(result, cmd_status_info->buffer, cmd_status_info->buffer_size);
+			*bytes = cmd_status_info->buffer_size;
+
+			cmd_status_info->buffer_is_free = true;
+			cmd_status_info->buffer_holds_data = false;
+
+			LWLockRelease(cmd_status_info->buffer_lock);
+
+			break;
+		}
+		else
+		{
+			LWLockRelease(cmd_status_info->buffer_lock);
+
+			if (loop++ % 100 == 0);
+				CHECK_FOR_INTERRUPTS();
+
+			pg_usleep(1000L);
+		}
+	}
+
+	return result;
+}
+
+/* signal handler for PROCSIG_CMDSTATUS_INFO */
+void
+HandleCmdStatusInfoInterrupt(void)
+{
+	bool	found = true;
+
+	cmd_status_info = attach_shmem(cmd_status_info);
+
+	/* search any request for current process */
+	while (found)
+	{
+		int		i;
+		CmdStatusInfoEntry *csie;
+
+		found = false;
+		csie = NULL;
+
+		/* take lock for slots */
+		LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE);
+
+		/* try to find any request in valid slots */
+		for (i = 0; i < CMDINFO_SLOTS; i++)
+		{
+			csie = &(cmd_status_info->slots[i]);
+
+			if (csie->is_valid && !csie->is_done && csie->target_pid == MyProcPid)
+			{
+				found = true;
+				break;
+			}
+		}
+
+		LWLockRelease(cmd_status_info->lock);
+
+		if (found)
+		{
+			/* process request */
+			Assert(csie != NULL);
+
+			if (ActivePortal)
+			{
+				if (csie->request_type == 1)
+				{
+					StringInfo str;
+
+					str = explain_query(ActivePortal->queryDesc);
+					write_to_shm_buffer(csie->sender_pid, MyProcPid, (void *) str->data, str->len);
+
+					pfree(str->data);
+
+					csie->is_done = true;
+					csie->result_code = 0;
+				}
+				else if (csie->request_type == 2)
+				{
+					write_to_shm_buffer(csie->sender_pid, MyProcPid,
+											 (void *) ActivePortal->sourceText,
+											 strlen(ActivePortal->sourceText));
+					csie->is_done = true;
+					csie->result_code = 0;
+				}
+				else if (csie->request_type == 3)
+				{
+					if (ActivePortal->commandTag != NULL)
+					{
+						if (strcmp(ActivePortal->commandTag, "SELECT") == 0)
+						{
+							char completationTag[COMPLETION_TAG_BUFSIZE];
+
+							snprintf(completationTag, COMPLETION_TAG_BUFSIZE,
+									 "SELECT %u",
+									 ActivePortal->queryDesc->estate->es_processed);
+							write_to_shm_buffer(csie->sender_pid, MyProcPid,
+											 (void *) completationTag,
+											 strlen(completationTag));
+						}
+						else
+							write_to_shm_buffer(csie->sender_pid, MyProcPid,
+											 (void *) ActivePortal->commandTag,
+											 strlen(ActivePortal->commandTag));
+					}
+
+					csie->is_done = true;
+					csie->result_code = 0;
+				}
+				else
+				{
+					csie->is_done = true;
+					csie->result_code = -1;
+				}
+			}
+			else
+			{
+				csie->is_done = true;
+				csie->result_code = -1;
+			}
+		}
+	}
+}
+
+
+static CmdStatusInfoEntry *
+NewCmdStatusInfoEntry(int target_pid, int request_type)
+{
+	bool	found = false;
+	CmdStatusInfoEntry *csie = NULL;
+	int			i;
+
+	cmd_status_info = attach_shmem(cmd_status_info);
+
+	/* find unused slot */
+	LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE);
+
+	for (i = 0; i < CMDINFO_SLOTS; i++)
+	{
+		csie = &(cmd_status_info->slots[i]);
+
+		if (!csie->is_valid)
+		{
+			found = true;
+			break;
+		}
+	}
+
+	LWLockRelease(cmd_status_info->lock);
+
+	if (!found)
+		elog(ERROR, "there are not free slots for cmdstatusinfo now");
+
+	csie->is_valid = true;
+	csie->is_done = false;
+	csie->target_pid = target_pid;
+	csie->sender_pid = MyProcPid;
+	csie->request_type = request_type;
+	csie->result_code = 0;
+
+	return csie;
+}
+
+/*
+ * try to get status of command in another process
+ *
+ * FUNCTION get_cmdstatus(pid, request_type)
+ * RETURNS SETOF text
+ */
+Datum
+pg_cmdstatus(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Tuplestorestate *tupstore;
+	TupleDesc	tupdesc;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	int target_pid				= PG_GETARG_INT32(0);
+	int request_type 			= PG_GETARG_INT32(1);
+	PGPROC	   *proc;
+	CmdStatusInfoEntry *csie;
+
+	Size			len;
+	void			*data = NULL;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	/* need to build tuplestore in query context */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+	tupstore = tuplestore_begin_heap(false, false, work_mem);
+	MemoryContextSwitchTo(oldcontext);
+
+	csie = NewCmdStatusInfoEntry(target_pid, request_type);
+
+	PG_TRY();
+	{
+		/* verify access to target_pid */
+		proc = BackendPidGetProc(target_pid);
+
+		if (proc == NULL)
+			ereport(ERROR,
+					(errmsg("PID %d is not a PostgreSQL server process", target_pid)));
+
+		if (!(superuser() || proc->roleId == GetUserId()))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 (errmsg("must be superuser or have the same role to cancel queries running in other server processes"))));
+
+		if (SendProcSignal((pid_t) target_pid, PROCSIG_CMDSTATUS_INFO, InvalidBackendId) < 0)
+			elog(ERROR, "could not signal backend with PID %d", target_pid);
+
+		while (1)
+		{
+			data = read_from_shm_buffer(MyProcPid, target_pid, &len);
+			if (len > 0)
+			{
+				Datum		value;
+				HeapTuple	tuple;
+				bool		isnull = false;
+				Size			processed = 0;
+				char			*cursor = data;
+
+				/* parse to rows */
+				while (processed < len)
+				{
+					char *eol = strchr(cursor, '\n');
+
+					if (eol != NULL)
+					{
+						int	line_size = eol - cursor;
+
+						value = PointerGetDatum(cstring_to_text_with_len(cursor, line_size));
+						cursor += line_size + 1;
+						processed += line_size + 1;
+					}
+					else
+					{
+						/* last line */
+						value = PointerGetDatum(cstring_to_text_with_len(cursor, len - processed));
+						processed = len;
+					}
+
+					tuple = heap_form_tuple(tupdesc, &value, &isnull);
+					tuplestore_puttuple(tupstore, tuple);
+				}
+
+				pfree(data);
+			}
+
+			if (csie->is_done)
+				break;
+		}
+
+		csie->is_valid = false;
+
+		/* clean up and return the tuplestore */
+		tuplestore_donestoring(tupstore);
+	}
+	PG_CATCH();
+	{
+		csie->is_valid = false;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	return (Datum) 0;
+}
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index cd9a287..f38be3d 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -18,6 +18,7 @@
 #include <unistd.h>
 
 #include "commands/async.h"
+#include "commands/cmdstatus.h"
 #include "miscadmin.h"
 #include "storage/latch.h"
 #include "storage/ipc.h"
@@ -292,6 +293,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+	if (CheckProcSignal(PROCSIG_CMDSTATUS_INFO))
+		HandleCmdStatusInfoInterrupt();
+
 	if (set_latch_on_sigusr1 && MyProc != NULL)
 		SetLatch(&MyProc->procLatch);
 
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 5176ed0..af731be 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2689,6 +2689,8 @@ DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f
 DESCR("statistics: information about currently active backends");
 DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
+DATA(insert OID = 4055 (  pg_cmdstatus		PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_  _null_  _null_ pg_cmdstatus _null_ _null_ _null_ ));
+DESCR("returns information about another process");
 DATA(insert OID = 2026 (  pg_backend_pid				PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
 DESCR("statistics: current backend PID");
 DATA(insert OID = 1937 (  pg_stat_get_backend_pid		PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 23 "23" _null_ _null_ _null_ _null_ pg_stat_get_backend_pid _null_ _null_ _null_ ));
diff --git a/src/include/commands/cmdstatus.h b/src/include/commands/cmdstatus.h
new file mode 100644
index 0000000..6a4e5af
--- /dev/null
+++ b/src/include/commands/cmdstatus.h
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.h
+ *
+ *
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/cmdstatus.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CMDSTATUS_H
+#define CMDSTATUS_H
+
+extern void HandleCmdStatusInfoInterrupt(void);
+
+#endif   /* CMDSTATUS_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index c625562..b3a60e7 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -40,6 +40,9 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+	/* cmd status info */
+	PROCSIG_CMDSTATUS_INFO,
+
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
 
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 78cc0a0..687e77e 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1194,6 +1194,9 @@ extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS);
 extern Datum pg_describe_object(PG_FUNCTION_ARGS);
 extern Datum pg_identify_object(PG_FUNCTION_ARGS);
 
+/* commands/cmdstatus.c */
+extern Datum pg_cmdstatus(PG_FUNCTION_ARGS);
+
 /* commands/constraint.c */
 extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to