On Wed, Sep 2, 2015 at 3:07 PM, Shulgin, Oleksandr < oleksandr.shul...@zalando.de> wrote:
> On Wed, Sep 2, 2015 at 3:04 PM, Pavel Stehule <pavel.steh...@gmail.com> > wrote: >> >> >>> Well, maybe I'm missing something, but sh_mq_create() will just >>> overwrite the contents of the struct, so it doesn't care about >>> sender/receiver: only sh_mq_set_sender/receiver() do. >>> >> >> if you create sh_mq from scratch, then you can reuse structure. >> > Please find attached a v3. It uses a shared memory queue and also has the ability to capture plans nested deeply in the call stack. Not sure about using the executor hook, since this is not an extension... The LWLock is used around initializing/cleaning the shared struct and the message queue, the IO synchronization is handled by the message queue itself. After some testing with concurrent pgbench and intentionally deep recursive plpgsql functions (up to 700 plpgsql stack frames) I think this approach can work. Unless there's some theoretical problem I'm just not aware of. :-) Comments welcome! -- Alex
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 0abde43..40db40d 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -26,6 +26,7 @@ #include "storage/shmem.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" +#include "utils/cmdstatus.h" /* @@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_CMDSTATUS_INFO)) + HandleCmdStatusInfoInterrupt(); + if (set_latch_on_sigusr1) SetLatch(MyLatch); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ce4bdaf..5d5df58 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -67,6 +67,7 @@ #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" +#include "utils/cmdstatus.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -2991,6 +2992,9 @@ ProcessInterrupts(void) if (ParallelMessagePending) HandleParallelMessages(); + + if (CmdStatusInfoRequested) + ProcessCmdStatusInfoRequest(); } diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile index 3ed0b44..2c8687c 100644 --- a/src/backend/utils/adt/Makefile +++ b/src/backend/utils/adt/Makefile @@ -18,7 +18,7 @@ endif # keep this list arranged alphabetically or it gets to be a mess OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \ array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \ - bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \ + bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \ encode.o enum.o expandeddatum.o \ float.o format_type.o formatting.o genfile.o \ geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \ diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c new file mode 100644 index 0000000..5f31a2d --- /dev/null +++ b/src/backend/utils/adt/cmdstatus.c @@ -0,0 +1,567 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.c + * Definitions for pg_cmdstatus function. + * + * Copyright (c) 1996-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/utils/adt/cmdstatus.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" + +#include "access/htup_details.h" +#include "commands/explain.h" +#include "lib/stringinfo.h" +#include "storage/latch.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "tcop/dest.h" +#include "tcop/pquery.h" +#include "utils/builtins.h" +#include "utils/cmdstatus.h" + + +typedef enum { + CMD_STATUS_REQUEST_EXPLAIN = 1, + CMD_STATUS_REQUEST_QUERY_TEXT = 2, + CMD_STATUS_REQUEST_PROGRESS_TAG = 3, + CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE = 4 +} CmdStatusInfoRequestType; + +#define CMD_STATUS_MAX_REQUEST CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE + +typedef enum { + CMD_STATUS_RESULT_FAILURE = -1, + CMD_STATUS_RESULT_SUCCESS = 0, + CMD_STATUS_RESULT_BACKEND_IDLE, + CMD_STATUS_RESULT_NO_COMMAND_TAG +} CmdStatusInfoResultCode; + +typedef struct { + LWLock *lock; + pid_t target_pid; + pid_t sender_pid; + CmdStatusInfoRequestType request_type; + CmdStatusInfoResultCode result_code; + char buffer[FLEXIBLE_ARRAY_MEMBER]; +} CmdStatusInfo; + +#define BUFFER_SIZE 8192 + +/* + * These structs are allocated on the program stack as local variables in the + * ExecutorRun hook. The top of stack is current_query_stack, see below. + */ +typedef struct CmdInfoStack { + QueryDesc *query_desc; + struct CmdInfoStack *parent; +} CmdInfoStack; + + +bool CmdStatusInfoEnabled = true; +bool CmdStatusInfoRequested = false; + +static CmdStatusInfo *cmd_status_info = NULL; +static CmdInfoStack *current_query_stack = NULL; +static int query_stack_size = 0; + +static ExecutorRun_hook_type prev_ExecutorRun = NULL; + +static void attach_shmem(void); +static void cmdstatus_ExecutorRun(QueryDesc *queryDesc, + ScanDirection direction, long count); + + +void +InstallCmdStatusInfoHooks(void) +{ + /*elog(LOG, "InstallCmdStatusInfoHooks");*/ + + prev_ExecutorRun = ExecutorRun_hook; + ExecutorRun_hook = cmdstatus_ExecutorRun; +} + + +static void +attach_shmem(void) +{ + bool found; + + Assert(sizeof(CmdStatusInfo) < BUFFER_SIZE); + + if (cmd_status_info == NULL) + { + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + cmd_status_info = (CmdStatusInfo *) ShmemInitStruct("cmdstatusinfo", + BUFFER_SIZE, &found); + if (!found) + { + cmd_status_info->lock = LWLockAssign(); + + cmd_status_info->target_pid = 0; + cmd_status_info->sender_pid = 0; + cmd_status_info->request_type = 0; + cmd_status_info->result_code = 0; + } + + LWLockRelease(AddinShmemInitLock); + } +} + + +static void +cmdstatus_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) +{ + CmdInfoStack current; + + /*ereport(LOG, + (errmsg("ExecutorRun : %p\t%d", queryDesc, query_stack_size), + errhidecontext(true), + errhidestmt(true)));*/ + + current.query_desc = queryDesc; + current.parent = current_query_stack; + + current_query_stack = ¤t; + query_stack_size++; + + PG_TRY(); + { + if (prev_ExecutorRun) + prev_ExecutorRun(queryDesc, direction, count); + else + standard_ExecutorRun(queryDesc, direction, count); + + Assert(current_query_stack == ¤t); + Assert(query_stack_size > 0); + + query_stack_size--; + current_query_stack = current.parent; + } + PG_CATCH(); + { + Assert(current_query_stack == ¤t); + Assert(query_stack_size > 0); + + query_stack_size--; + current_query_stack = current.parent; + + PG_RE_THROW(); + } + PG_END_TRY(); +} + + +static StringInfo +explain_query(QueryDesc *queryDesc) +{ + ExplainState *es; + + es = NewExplainState(); + es->analyze = false; + es->verbose = false; + es->buffers = false; + es->format = EXPLAIN_FORMAT_TEXT; + + ExplainBeginOutput(es); + /* XXX: appendStringInfo(es->str, "#%d ", depth); ? */ + ExplainQueryText(es, queryDesc); + ExplainPrintPlan(es, queryDesc); + ExplainEndOutput(es); + + /* Remove last line break. */ + if (es->str->len > 0 && es->str->data[es->str->len - 1] == '\n') + es->str->data[--es->str->len] = '\0'; + + return es->str; +} + +static void +send_explain_plan(shm_mq_handle *shmq, const char *str) +{ + /* Break plan into lines and send each as a separate message. */ + while (*str) + { + const char *q = strchr(str, '\n'); + Size len = q ? q - str : strlen(str); + + shm_mq_send(shmq, len, str, false); + + if (!q) + break; + + str += len + 1; + } +} + +static void +explain_one_query_desc(shm_mq_handle *shmq, QueryDesc *query_desc) +{ + StringInfo str; + + str = explain_query(query_desc); + + send_explain_plan(shmq, str->data); + + pfree(str->data); + pfree(str); +} + + +/* signal handler for PROCSIG_CMDSTATUS_INFO */ +void +HandleCmdStatusInfoInterrupt(void) +{ + if (CmdStatusInfoEnabled && !CmdStatusInfoRequested) + { + InterruptPending = true; + CmdStatusInfoRequested = true; + + SetLatch(MyLatch); + } +} + +void +ProcessCmdStatusInfoRequest(void) +{ + shm_mq *buffer = NULL; + shm_mq_handle *buffer_handle = NULL; + + if (!(CmdStatusInfoEnabled && CmdStatusInfoRequested)) + return; + + CmdStatusInfoEnabled = false; + CmdStatusInfoRequested = false; + + PG_TRY(); + { + attach_shmem(); + + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + if (cmd_status_info->target_pid == 0) + { + LWLockRelease(cmd_status_info->lock); + + elog(LOG, "target backend PID is not set"); + return; + } + else if (cmd_status_info->target_pid != MyProcPid) + { + pid_t target_pid = cmd_status_info->target_pid; + + LWLockRelease(cmd_status_info->lock); + + elog(LOG, "target backend PID doesn't match: expected %d, but seen %d", + MyProcPid, target_pid); + return; + } + + buffer = (shm_mq *) &cmd_status_info->buffer; + shm_mq_set_sender(buffer, MyProc); + + buffer_handle = shm_mq_attach(buffer, NULL, NULL); + + /* Show some optimism. */ + cmd_status_info->result_code = CMD_STATUS_RESULT_SUCCESS; + + /* + * It should be safe to release the lock now, since the IO + * synchronization is performed on the shared memory queue itself. + */ + LWLockRelease(cmd_status_info->lock); + + if (ActivePortal) + { + switch (cmd_status_info->request_type) + { + case CMD_STATUS_REQUEST_EXPLAIN: + explain_one_query_desc(buffer_handle, ActivePortal->queryDesc); + break; + + case CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE: + { + CmdInfoStack *query; + + if (current_query_stack != NULL) + { + for (query = current_query_stack; + query != NULL; + query = query->parent) + { + explain_one_query_desc(buffer_handle, query->query_desc); + } + } + else + { + /* XXX */ + cmd_status_info->result_code = CMD_STATUS_RESULT_BACKEND_IDLE; + } + break; + } + + case CMD_STATUS_REQUEST_QUERY_TEXT: + /* XXX: how useful is this one? we already have pg_stat_activity */ + shm_mq_send(buffer_handle, strlen(ActivePortal->sourceText), + ActivePortal->sourceText, false); + break; + + case CMD_STATUS_REQUEST_PROGRESS_TAG: + if (ActivePortal->commandTag != NULL) + { + if (ActivePortal->queryDesc != NULL && + ActivePortal->queryDesc->estate != NULL) + { + char completionTag[COMPLETION_TAG_BUFSIZE]; + + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %u", + ActivePortal->commandTag, + ActivePortal->queryDesc->estate->es_processed); + + shm_mq_send(buffer_handle, strlen(completionTag), + completionTag, false); + } + else + { + /* no progress available, at least show the command tag */ + shm_mq_send(buffer_handle, strlen(ActivePortal->commandTag), + ActivePortal->commandTag, false); + } + } + else + { + cmd_status_info->result_code = CMD_STATUS_RESULT_NO_COMMAND_TAG; + } + break; + } + } + else + { + cmd_status_info->result_code = CMD_STATUS_RESULT_BACKEND_IDLE; + } + + shm_mq_detach(buffer); + + CmdStatusInfoEnabled = true; + } + PG_CATCH(); + { + if (buffer_handle) + shm_mq_detach(buffer); + + CmdStatusInfoEnabled = true; + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +static void +report_result_code(CmdStatusInfoResultCode result, pid_t target_pid) +{ + switch (result) + { + case CMD_STATUS_RESULT_BACKEND_IDLE: + elog(INFO, "no command is currently running in backend with PID %d", + target_pid); + break; + + case CMD_STATUS_RESULT_NO_COMMAND_TAG: + elog(WARNING, "no command tag found for the query in backend with PID %d", + target_pid); + break; + + default: + elog(ERROR, "general command status request failure"); + break; + } +} + +/* + * try to get status of command in another process + * + * FUNCTION get_cmdstatus(pid, request_type) + * RETURNS SETOF text + */ +Datum +pg_cmdstatus(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Tuplestorestate *tupstore; + TupleDesc tupdesc; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + pid_t target_pid = (pid_t) PG_GETARG_INT32(0); + int request_type = PG_GETARG_INT32(1); + PGPROC *proc; + + shm_mq *buffer = NULL; + shm_mq_handle *buffer_handle = NULL; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + if (request_type < 1 || request_type > CMD_STATUS_MAX_REQUEST) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unknown command status request"))); + + /* need to build tuplestore in query context */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + tupstore = tuplestore_begin_heap(false, false, work_mem); + MemoryContextSwitchTo(oldcontext); + + PG_TRY(); + { + bool done; + + if (target_pid == MyProcPid) + ereport(ERROR, + (errmsg("backend cannot query command status of itself"))); + + /* verify access to target_pid */ + proc = BackendPidGetProc(target_pid); + + if (proc == NULL) + ereport(ERROR, + (errmsg("PID %d is not a PostgreSQL server process", target_pid))); + + if (!(superuser() || proc->roleId == GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser or have the same role to cancel queries running in other server processes")))); + + attach_shmem(); + + /* wait before the shared status struct is free to be used */ + for (;;) + { + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + if (cmd_status_info->target_pid == 0) + { + cmd_status_info->target_pid = target_pid; + cmd_status_info->sender_pid = MyProcPid; + cmd_status_info->request_type = request_type; + cmd_status_info->result_code = CMD_STATUS_RESULT_FAILURE; + + buffer = shm_mq_create(&cmd_status_info->buffer, + BUFFER_SIZE - offsetof(CmdStatusInfo, buffer)); + + shm_mq_set_receiver(buffer, MyProc); + + buffer_handle = shm_mq_attach(buffer, NULL, NULL); + + LWLockRelease(cmd_status_info->lock); + break; + } + else + { + LWLockRelease(cmd_status_info->lock); + pg_usleep(1000L); + } + } + + if (SendProcSignal(target_pid, PROCSIG_CMDSTATUS_INFO, InvalidBackendId) < 0) + elog(ERROR, "could not signal backend with PID %d", target_pid); + + done = false; + while (!done) + { + char *data; + Size len; + shm_mq_result res; + + res = shm_mq_receive(buffer_handle, &len, (void **) &data, + /* nowait = */ false); + switch (res) + { + case SHM_MQ_SUCCESS: + { + Datum value; + HeapTuple tuple; + bool isnull = false; + + value = PointerGetDatum(cstring_to_text_with_len(data, len)); + + tuple = heap_form_tuple(tupdesc, &value, &isnull); + tuplestore_puttuple(tupstore, tuple); + break; + } + + case SHM_MQ_DETACHED: + if (cmd_status_info->result_code != CMD_STATUS_RESULT_SUCCESS) + { + report_result_code(cmd_status_info->result_code, target_pid); + } + done = true; + break; + + default: + Assert(!"unknown shm_mq_result code"); + } + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + shm_mq_detach(buffer); + + /* clean the status struct for the next user */ + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + cmd_status_info->target_pid = 0; + cmd_status_info->sender_pid = 0; + cmd_status_info->request_type = 0; + cmd_status_info->result_code = 0; + + LWLockRelease(cmd_status_info->lock); + } + PG_CATCH(); + { + if (buffer_handle) + shm_mq_detach(buffer); + + if (cmd_status_info) + { + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + cmd_status_info->target_pid = 0; + cmd_status_info->sender_pid = 0; + cmd_status_info->request_type = 0; + cmd_status_info->result_code = 0; + + LWLockRelease(cmd_status_info->lock); + } + + PG_RE_THROW(); + } + PG_END_TRY(); + + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + return (Datum) 0; +} diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 7b19714..19e7009 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -50,6 +50,7 @@ #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/acl.h" +#include "utils/cmdstatus.h" #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -1017,6 +1018,8 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, /* initialize client encoding */ InitializeClientEncoding(); + InstallCmdStatusInfoHooks(); + /* report this backend in the PgBackendStatus array */ if (!bootstrap) pgstat_bestart(); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index ddf7c67..d083aaf 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any"); DATA(insert OID = 2855 ( pg_is_other_temp_schema PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ )); DESCR("is schema another session's temp schema?"); +DATA(insert OID = 4099 ( pg_cmdstatus PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_ _null_ _null_ _null_ pg_cmdstatus _null_ _null_ _null_ )); +DESCR("returns information about another process"); DATA(insert OID = 2171 ( pg_cancel_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ )); DESCR("cancel a server process' current query"); DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ )); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index af1a0cd..ab1698c 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -41,6 +41,9 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + /* cmd status info */ + PROCSIG_CMDSTATUS_INFO, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index fc1679e..605612e 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS); /* catalog/objectaddress.c */ extern Datum pg_get_object_address(PG_FUNCTION_ARGS); +/* utils/adt/cmdstatus.c */ +extern Datum pg_cmdstatus(PG_FUNCTION_ARGS); + /* commands/constraint.c */ extern Datum unique_key_recheck(PG_FUNCTION_ARGS); diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h new file mode 100644 index 0000000..52ddf6a --- /dev/null +++ b/src/include/utils/cmdstatus.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.h + * Declarations for command status interrupt handling. + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * + * src/include/utils/cmdstatus.h + * + *------------------------------------------------------------------------- + */ +#ifndef CMDSTATUS_H +#define CMDSTATUS_H + +extern bool CmdStatusInfoRequested; + +extern void InstallCmdStatusInfoHooks(void); + +extern void HandleCmdStatusInfoInterrupt(void); +extern void ProcessCmdStatusInfoRequest(void); + +#endif /* CMDSTATUS_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers