Hi I am sending a prototype with basic implementation with explain of running query specified by pid.
It can show more than the execution plan. There is a examples of full query text and running completion tag. This patch is in early stage - I know, so there is one race condition. I hoped so I can use new shm_mq API, but it is not prepared for usage where receiver and sender are mutable. How it works: postgres=# select pg_cmdstatus(pid,1) from pg_stat_activity where pid <> pg_backend_pid(); pg_cmdstatus ------------------------------------------------------------------------------- Query Text: select * from pg_class, pg_attribute limit 4000000; Limit (cost=0.00..8795.58 rows=697380 width=403) -> Nested Loop (cost=0.00..8795.58 rows=697380 width=403) -> Seq Scan on pg_attribute (cost=0.00..66.64 rows=2364 width=203) -> Materialize (cost=0.00..12.42 rows=295 width=200) -> Seq Scan on pg_class (cost=0.00..10.95 rows=295 width=200) (6 rows) postgres=# select pg_cmdstatus(pid,2) from pg_stat_activity where pid <> pg_backend_pid(); pg_cmdstatus ----------------------------------------------------- select * from pg_class, pg_attribute limit 4000000; (1 row) postgres=# select pg_cmdstatus(pid,3) from pg_stat_activity where pid <> pg_backend_pid(); pg_cmdstatus --------------- SELECT 144427 (1 row) postgres=# select pg_cmdstatus(pid,3) from pg_stat_activity where pid <> pg_backend_pid(); pg_cmdstatus --------------- SELECT 209742 (1 row) postgres=# select pg_cmdstatus(pid,3) from pg_stat_activity where pid <> pg_backend_pid(); pg_cmdstatus --------------- SELECT 288472 (1 row) In future a function can be replaced by statement EXPLAIN pid WITH autocomplete - It can show a subset of EXPLAIN ANALYZE -- but it needs a some parametrization of executor environment. First discuss to this topic was year ago http://www.postgresql.org/message-id/cafj8pra-duzkmdtu52ciugb0p7tvri_b8ltjmjfwcnr1lpt...@mail.gmail.com http://www.postgresql.org/message-id/CAFj8pRDEo24joEg4UFRDYeFADFTw-jw_=t=kPwOyDW=v=g1...@mail.gmail.com Regards Pavel
commit 8dfcd8daee84cbe60dbf552073f883751274faf4 Author: Pavel Stehule <pavel.steh...@gooddata.com> Date: Fri Sep 5 21:29:06 2014 +0200 vip prototype - explain and status of another PostgreSQL proces diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 22f116b..d5ec3ae 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ - collationcmds.o constraint.o conversioncmds.o copy.o createas.o \ + cmdstatus.o collationcmds.o constraint.o conversioncmds.o copy.o createas.o \ dbcommands.o define.o discard.o dropcmds.o \ event_trigger.o explain.o extension.o foreigncmds.o functioncmds.o \ indexcmds.o lockcmds.o matview.o operatorcmds.o opclasscmds.o \ diff --git a/src/backend/commands/cmdstatus.c b/src/backend/commands/cmdstatus.c new file mode 100644 index 0000000..f2d0d9c --- /dev/null +++ b/src/backend/commands/cmdstatus.c @@ -0,0 +1,468 @@ +/*------------------------------------------------------------------------- + * + * comment.c + * + * PostgreSQL object comments utility code. + * + * Copyright (c) 1996-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/comment.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" + +#include "access/htup_details.h" +#include "commands/cmdstatus.h" +#include "commands/explain.h" +#include "lib/stringinfo.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "tcop/dest.h" +#include "tcop/pquery.h" +#include "utils/builtins.h" + + +#define CMDINFO_SLOTS 100 +#define BUFFER_SIZE (8 * 1024) + +typedef struct { + bool is_valid; + bool is_done; + int target_pid; + int sender_pid; + int request_type; + int result_code; +} CmdStatusInfoEntry; + +typedef struct { + LWLock *lock; /* protect slots - search/modification */ + CmdStatusInfoEntry *slots; + LWLock *buffer_lock; /* protect buffer handling */ + void *buffer; /* result data */ + Size buffer_size; + int target_pid; + int sender_pid; + bool buffer_is_free; /* buffer is generally available */ + bool buffer_holds_data; /* buffer holds a valid data */ +} CmdStatusInfo; + +static CmdStatusInfo *cmd_status_info = NULL; + +/* + * Prepare explain of query + * + */ +static StringInfo +explain_query(QueryDesc *queryDesc) +{ + ExplainState es; + + ExplainInitState(&es); + es.analyze = false; + es.verbose = false; + es.buffers = false; + es.format = EXPLAIN_FORMAT_TEXT; + + ExplainBeginOutput(&es); + ExplainQueryText(&es, queryDesc); + ExplainPrintPlan(&es, queryDesc); + + ExplainEndOutput(&es); + + /* Remove last line break */ + if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n') + es.str->data[--es.str->len] = '\0'; + + return es.str; +} + +static CmdStatusInfo * +attach_shmem(CmdStatusInfo *cmd_status_info) +{ + bool found; + + if (cmd_status_info == NULL) + { + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + cmd_status_info = (CmdStatusInfo *) ShmemInitStruct("cmdstatusinfo", + sizeof(CmdStatusInfo), + &found); + if (!found) + { + int i; + + cmd_status_info->lock = LWLockAssign(); + cmd_status_info->slots = ShmemAlloc(CMDINFO_SLOTS * sizeof(CmdStatusInfo)); + + for (i = 0; i < CMDINFO_SLOTS; i++) + { + cmd_status_info->slots[i].is_valid = false; + } + + cmd_status_info->buffer_lock = LWLockAssign(); + cmd_status_info->buffer = ShmemAlloc(BUFFER_SIZE); + cmd_status_info->buffer_is_free = true; + cmd_status_info->buffer_holds_data = true; + } + + LWLockRelease(AddinShmemInitLock); + } + + return cmd_status_info; +} + +/* + * write data to shm buffer - wait for free buffer + * + */ +static void +write_to_shm_buffer(int target_pid, int sender_pid, void *data, Size bytes) +{ + int loop = 0; + + cmd_status_info = attach_shmem(cmd_status_info); + + while (1) + { + LWLockAcquire(cmd_status_info->buffer_lock, LW_EXCLUSIVE); + + if (cmd_status_info->buffer_is_free) + { + cmd_status_info->target_pid = target_pid; + cmd_status_info->sender_pid = sender_pid; + cmd_status_info->buffer_is_free = false; + cmd_status_info->buffer_holds_data = true; + + cmd_status_info->buffer_size = bytes; + memcpy(cmd_status_info->buffer, data, bytes); + + LWLockRelease(cmd_status_info->buffer_lock); + break; + } + else + { + LWLockRelease(cmd_status_info->buffer_lock); + + if (loop++ % 100 == 0); + CHECK_FOR_INTERRUPTS(); + + if (loop > 100000) + elog(ERROR, "cannot to take buffer to send data"); + + pg_usleep(1000L); + } + } +} + +/* + * It read data from shm buffer, waits for data + * + */ +static void * +read_from_shm_buffer(int target_pid, int sender_pid, Size *bytes) +{ + void *result = NULL; + int loop = 0; + + cmd_status_info = attach_shmem(cmd_status_info); + + while (1) + { + LWLockAcquire(cmd_status_info->buffer_lock, LW_EXCLUSIVE); + + if (cmd_status_info->buffer_holds_data && + cmd_status_info->target_pid == target_pid && + cmd_status_info->sender_pid == sender_pid) + { + result = palloc(cmd_status_info->buffer_size); + memcpy(result, cmd_status_info->buffer, cmd_status_info->buffer_size); + *bytes = cmd_status_info->buffer_size; + + cmd_status_info->buffer_is_free = true; + cmd_status_info->buffer_holds_data = false; + + LWLockRelease(cmd_status_info->buffer_lock); + + break; + } + else + { + LWLockRelease(cmd_status_info->buffer_lock); + + if (loop++ % 100 == 0); + CHECK_FOR_INTERRUPTS(); + + pg_usleep(1000L); + } + } + + return result; +} + +/* signal handler for PROCSIG_CMDSTATUS_INFO */ +void +HandleCmdStatusInfoInterrupt(void) +{ + bool found = true; + + cmd_status_info = attach_shmem(cmd_status_info); + + /* search any request for current process */ + while (found) + { + int i; + CmdStatusInfoEntry *csie; + + found = false; + csie = NULL; + + /* take lock for slots */ + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + /* try to find any request in valid slots */ + for (i = 0; i < CMDINFO_SLOTS; i++) + { + csie = &(cmd_status_info->slots[i]); + + if (csie->is_valid && !csie->is_done && csie->target_pid == MyProcPid) + { + found = true; + break; + } + } + + LWLockRelease(cmd_status_info->lock); + + if (found) + { + /* process request */ + Assert(csie != NULL); + + if (ActivePortal) + { + if (csie->request_type == 1) + { + StringInfo str; + + str = explain_query(ActivePortal->queryDesc); + write_to_shm_buffer(csie->sender_pid, MyProcPid, (void *) str->data, str->len); + + pfree(str->data); + + csie->is_done = true; + csie->result_code = 0; + } + else if (csie->request_type == 2) + { + write_to_shm_buffer(csie->sender_pid, MyProcPid, + (void *) ActivePortal->sourceText, + strlen(ActivePortal->sourceText)); + csie->is_done = true; + csie->result_code = 0; + } + else if (csie->request_type == 3) + { + if (ActivePortal->commandTag != NULL) + { + if (strcmp(ActivePortal->commandTag, "SELECT") == 0) + { + char completationTag[COMPLETION_TAG_BUFSIZE]; + + snprintf(completationTag, COMPLETION_TAG_BUFSIZE, + "SELECT %u", + ActivePortal->queryDesc->estate->es_processed); + write_to_shm_buffer(csie->sender_pid, MyProcPid, + (void *) completationTag, + strlen(completationTag)); + } + else + write_to_shm_buffer(csie->sender_pid, MyProcPid, + (void *) ActivePortal->commandTag, + strlen(ActivePortal->commandTag)); + } + + csie->is_done = true; + csie->result_code = 0; + } + else + { + csie->is_done = true; + csie->result_code = -1; + } + } + else + { + csie->is_done = true; + csie->result_code = -1; + } + } + } +} + + +static CmdStatusInfoEntry * +NewCmdStatusInfoEntry(int target_pid, int request_type) +{ + bool found = false; + CmdStatusInfoEntry *csie = NULL; + int i; + + cmd_status_info = attach_shmem(cmd_status_info); + + /* find unused slot */ + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + for (i = 0; i < CMDINFO_SLOTS; i++) + { + csie = &(cmd_status_info->slots[i]); + + if (!csie->is_valid) + { + found = true; + break; + } + } + + LWLockRelease(cmd_status_info->lock); + + if (!found) + elog(ERROR, "there are not free slots for cmdstatusinfo now"); + + csie->is_valid = true; + csie->is_done = false; + csie->target_pid = target_pid; + csie->sender_pid = MyProcPid; + csie->request_type = request_type; + csie->result_code = 0; + + return csie; +} + +/* + * try to get status of command in another process + * + * FUNCTION get_cmdstatus(pid, request_type) + * RETURNS SETOF text + */ +Datum +pg_cmdstatus(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Tuplestorestate *tupstore; + TupleDesc tupdesc; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int target_pid = PG_GETARG_INT32(0); + int request_type = PG_GETARG_INT32(1); + PGPROC *proc; + CmdStatusInfoEntry *csie; + + Size len; + void *data = NULL; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* need to build tuplestore in query context */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + tupstore = tuplestore_begin_heap(false, false, work_mem); + MemoryContextSwitchTo(oldcontext); + + csie = NewCmdStatusInfoEntry(target_pid, request_type); + + PG_TRY(); + { + /* verify access to target_pid */ + proc = BackendPidGetProc(target_pid); + + if (proc == NULL) + ereport(ERROR, + (errmsg("PID %d is not a PostgreSQL server process", target_pid))); + + if (!(superuser() || proc->roleId == GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser or have the same role to cancel queries running in other server processes")))); + + if (SendProcSignal((pid_t) target_pid, PROCSIG_CMDSTATUS_INFO, InvalidBackendId) < 0) + elog(ERROR, "could not signal backend with PID %d", target_pid); + + while (1) + { + data = read_from_shm_buffer(MyProcPid, target_pid, &len); + if (len > 0) + { + Datum value; + HeapTuple tuple; + bool isnull = false; + Size processed = 0; + char *cursor = data; + + /* parse to rows */ + while (processed < len) + { + char *eol = strchr(cursor, '\n'); + + if (eol != NULL) + { + int line_size = eol - cursor; + + value = PointerGetDatum(cstring_to_text_with_len(cursor, line_size)); + cursor += line_size + 1; + processed += line_size + 1; + } + else + { + /* last line */ + value = PointerGetDatum(cstring_to_text_with_len(cursor, len - processed)); + processed = len; + } + + tuple = heap_form_tuple(tupdesc, &value, &isnull); + tuplestore_puttuple(tupstore, tuple); + } + + pfree(data); + } + + if (csie->is_done) + break; + } + + csie->is_valid = false; + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + } + PG_CATCH(); + { + csie->is_valid = false; + PG_RE_THROW(); + } + PG_END_TRY(); + + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + return (Datum) 0; +} diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index cd9a287..f38be3d 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -18,6 +18,7 @@ #include <unistd.h> #include "commands/async.h" +#include "commands/cmdstatus.h" #include "miscadmin.h" #include "storage/latch.h" #include "storage/ipc.h" @@ -292,6 +293,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_CMDSTATUS_INFO)) + HandleCmdStatusInfoInterrupt(); + if (set_latch_on_sigusr1 && MyProc != NULL) SetLatch(&MyProc->procLatch); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 5176ed0..af731be 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2689,6 +2689,8 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); +DATA(insert OID = 4055 ( pg_cmdstatus PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_ _null_ _null_ pg_cmdstatus _null_ _null_ _null_ )); +DESCR("returns information about another process"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); DATA(insert OID = 1937 ( pg_stat_get_backend_pid PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 23 "23" _null_ _null_ _null_ _null_ pg_stat_get_backend_pid _null_ _null_ _null_ )); diff --git a/src/include/commands/cmdstatus.h b/src/include/commands/cmdstatus.h new file mode 100644 index 0000000..6a4e5af --- /dev/null +++ b/src/include/commands/cmdstatus.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.h + * + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/cmdstatus.h + * + *------------------------------------------------------------------------- + */ +#ifndef CMDSTATUS_H +#define CMDSTATUS_H + +extern void HandleCmdStatusInfoInterrupt(void); + +#endif /* CMDSTATUS_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index c625562..b3a60e7 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,9 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + /* cmd status info */ + PROCSIG_CMDSTATUS_INFO, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 78cc0a0..687e77e 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1194,6 +1194,9 @@ extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS); extern Datum pg_describe_object(PG_FUNCTION_ARGS); extern Datum pg_identify_object(PG_FUNCTION_ARGS); +/* commands/cmdstatus.c */ +extern Datum pg_cmdstatus(PG_FUNCTION_ARGS); + /* commands/constraint.c */ extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers