---
 src/backend/commands/explain.c          |   4 +
 src/backend/executor/execMain.c         |   1 +
 src/backend/executor/execParallel.c     | 129 ++++++++++++++++++++++++
 src/backend/executor/execUtils.c        |   1 +
 src/backend/utils/activity/wait_event.c |  22 +++-
 src/include/executor/execParallel.h     |   2 +
 src/include/nodes/execnodes.h           |   2 +
 src/include/utils/wait_event.h          |   3 +
 src/test/regress/expected/explain.out   |  17 ++++
 src/test/regress/sql/explain.sql        |  12 +++
 10 files changed, 190 insertions(+), 3 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 0b7cc5c15c6..9d7372f5415 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -594,6 +594,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, 
ExplainState *es,
                        waitEventUsagePtr = &waitEventUsage;
                        pgstat_begin_wait_event_usage(waitEventUsagePtr,
                                                                                
  queryDesc->estate->es_query_cxt);
+                       queryDesc->estate->es_wait_event_usage = 
waitEventUsagePtr;
                }
 
                /* run the plan */
@@ -607,7 +608,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, 
ExplainState *es,
                PG_FINALLY();
                {
                        if (waitEventUsagePtr)
+                       {
                                pgstat_end_wait_event_usage(waitEventUsagePtr);
+                               queryDesc->estate->es_wait_event_usage = NULL;
+                       }
                }
                PG_END_TRY();
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4b30f768680..86ab124c1c0 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -3066,6 +3066,7 @@ EvalPlanQualStart(EPQState *epqstate, Plan *planTree)
        /* es_trig_target_relations must NOT be copied */
        rcestate->es_top_eflags = parentestate->es_top_eflags;
        rcestate->es_instrument = parentestate->es_instrument;
+       rcestate->es_wait_event_usage = parentestate->es_wait_event_usage;
        /* es_auxmodifytables must NOT be copied */
 
        /*
diff --git a/src/backend/executor/execParallel.c 
b/src/backend/executor/execParallel.c
index 81b87d82fab..8213565a708 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -51,6 +51,7 @@
 #include "utils/dsa.h"
 #include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
+#include "utils/wait_event.h"
 
 /*
  * Magic numbers for parallel executor communication.  We use constants
@@ -67,6 +68,7 @@
 #define PARALLEL_KEY_QUERY_TEXT                UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE                 UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_WAIT_EVENT_USAGE  UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE              65536
 
@@ -114,6 +116,18 @@ struct SharedExecutorInstrumentation
        (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation 
*), \
         (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
 
+typedef struct SharedWaitEventUsageWorker
+{
+       int                     nentries;
+       dsa_pointer entries;
+} SharedWaitEventUsageWorker;
+
+struct SharedWaitEventUsage
+{
+       int                     num_workers;
+       SharedWaitEventUsageWorker worker_usage[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /* Context object for ExecParallelEstimate. */
 typedef struct ExecParallelEstimateContext
 {
@@ -141,6 +155,10 @@ static bool ExecParallelReInitializeDSM(PlanState 
*planstate,
                                                                                
ParallelContext *pcxt);
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
                                                                                
                SharedExecutorInstrumentation *instrumentation);
+static void ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei);
+static void ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
+                                                                               
         dsa_area *area,
+                                                                               
         const WaitEventUsage *usage);
 
 /* Helper function that runs in the parallel worker. */
 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
@@ -664,10 +682,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
        char       *paramlistinfo_space;
        BufferUsage *bufusage_space;
        WalUsage   *walusage_space;
+       SharedWaitEventUsage *wait_event_usage = NULL;
        SharedExecutorInstrumentation *instrumentation = NULL;
        SharedJitInstrumentation *jit_instrumentation = NULL;
        int                     pstmt_len;
        int                     paramlistinfo_len;
+       int                     wait_event_usage_len = 0;
        int                     instrumentation_len = 0;
        int                     jit_instrumentation_len = 0;
        int                     instrument_offset = 0;
@@ -744,6 +764,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
                                                   mul_size(sizeof(WalUsage), 
pcxt->nworkers));
        shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+       /* Estimate space for per-worker wait event usage metadata. */
+       if (estate->es_wait_event_usage != NULL)
+       {
+               wait_event_usage_len =
+                       offsetof(SharedWaitEventUsage, worker_usage) +
+                       mul_size(sizeof(SharedWaitEventUsageWorker), 
pcxt->nworkers);
+               shm_toc_estimate_chunk(&pcxt->estimator, wait_event_usage_len);
+               shm_toc_estimate_keys(&pcxt->estimator, 1);
+       }
+
        /* Estimate space for tuple queues. */
        shm_toc_estimate_chunk(&pcxt->estimator,
                                                   
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -839,6 +869,21 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
        pei->wal_usage = walusage_space;
 
+       /* Allocate metadata for each worker's wait event usage, if requested. 
*/
+       if (estate->es_wait_event_usage != NULL)
+       {
+               wait_event_usage = shm_toc_allocate(pcxt->toc, 
wait_event_usage_len);
+               wait_event_usage->num_workers = nworkers;
+               for (int i = 0; i < nworkers; i++)
+               {
+                       wait_event_usage->worker_usage[i].nentries = 0;
+                       wait_event_usage->worker_usage[i].entries = 
InvalidDsaPointer;
+               }
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAIT_EVENT_USAGE,
+                                          wait_event_usage);
+               pei->wait_event_usage = wait_event_usage;
+       }
+
        /* Set up the tuple queues that the workers will write into. */
        pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
@@ -1213,6 +1258,68 @@ ExecParallelRetrieveJitInstrumentation(PlanState 
*planstate,
        memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
 }
 
+static void
+ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei)
+{
+       SharedWaitEventUsage *shared = pei->wait_event_usage;
+       WaitEventUsage *usage;
+
+       if (shared == NULL)
+               return;
+
+       usage = pei->planstate->state->es_wait_event_usage;
+       if (usage == NULL)
+               return;
+
+       for (int i = 0; i < shared->num_workers; i++)
+       {
+               SharedWaitEventUsageWorker *worker = &shared->worker_usage[i];
+               WaitEventUsageEntry *entries;
+
+               if (worker->nentries <= 0 || 
!DsaPointerIsValid(worker->entries))
+                       continue;
+
+               entries = dsa_get_address(pei->area, worker->entries);
+               pgstat_accumulate_wait_event_usage(usage,
+                                                                               
   entries,
+                                                                               
   worker->nentries);
+               dsa_free(pei->area, worker->entries);
+               worker->nentries = 0;
+               worker->entries = InvalidDsaPointer;
+       }
+}
+
+static void
+ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
+                                                                dsa_area *area,
+                                                                const 
WaitEventUsage *usage)
+{
+       SharedWaitEventUsageWorker *worker;
+       WaitEventUsageEntry *entries;
+       dsa_pointer entries_dsa;
+       Size            entries_size;
+
+       Assert(shared != NULL);
+       Assert(area != NULL);
+       Assert(usage != NULL);
+       Assert(IsParallelWorker());
+       Assert(ParallelWorkerNumber < shared->num_workers);
+
+       if (usage->nentries <= 0)
+               return;
+
+       worker = &shared->worker_usage[ParallelWorkerNumber];
+       entries_size = mul_size(sizeof(WaitEventUsageEntry), usage->nentries);
+       entries_dsa = dsa_allocate(area, entries_size);
+       entries = dsa_get_address(area, entries_dsa);
+       memcpy(entries, usage->entries, entries_size);
+
+       if (DsaPointerIsValid(worker->entries))
+               dsa_free(area, worker->entries);
+       worker->nentries = usage->nentries;
+       worker->entries = entries_dsa;
+}
+
 /*
  * Finish parallel execution.  We wait for parallel workers to finish, and
  * accumulate their buffer/WAL usage.
@@ -1261,6 +1368,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
        for (i = 0; i < nworkers; i++)
                InstrAccumParallelQuery(&pei->buffer_usage[i], 
&pei->wal_usage[i]);
 
+       /* Accumulate wait event usage, if requested. */
+       ExecParallelRetrieveWaitEventUsage(pei);
+
        pei->finished = true;
 }
 
@@ -1516,10 +1626,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        FixedParallelExecutorState *fpes;
        BufferUsage *buffer_usage;
        WalUsage   *wal_usage;
+       SharedWaitEventUsage *wait_event_usage;
        DestReceiver *receiver;
        QueryDesc  *queryDesc;
        SharedExecutorInstrumentation *instrumentation;
        SharedJitInstrumentation *jit_instrumentation;
+       WaitEventUsage waitEventUsage;
+       WaitEventUsage *waitEventUsagePtr = NULL;
        int                     instrument_options = 0;
        void       *area_space;
        dsa_area   *area;
@@ -1535,6 +1648,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
                instrument_options = instrumentation->instrument_options;
        jit_instrumentation = shm_toc_lookup(toc, 
PARALLEL_KEY_JIT_INSTRUMENTATION,
                                                                                
 true);
+       wait_event_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAIT_EVENT_USAGE, 
true);
        queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
 
        /* Setting debug_query_string for individual workers */
@@ -1576,6 +1690,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
         */
        InstrStartParallelQuery();
 
+       if (wait_event_usage != NULL)
+       {
+               waitEventUsagePtr = &waitEventUsage;
+               pgstat_begin_wait_event_usage(waitEventUsagePtr,
+                                                                         
queryDesc->estate->es_query_cxt);
+       }
+
        /*
         * Run the plan.  If we specified a tuple bound, be careful not to 
demand
         * more tuples than that.
@@ -1587,6 +1708,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        /* Shut down the executor */
        ExecutorFinish(queryDesc);
 
+       if (waitEventUsagePtr != NULL)
+       {
+               pgstat_end_wait_event_usage(waitEventUsagePtr);
+               ExecParallelReportWaitEventUsage(wait_event_usage,
+                                                                               
 area,
+                                                                               
 waitEventUsagePtr);
+       }
+
        /* Report buffer/WAL usage during parallel execution. */
        buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
        wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 1eb6b9f1f40..80ea777632b 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -151,6 +151,7 @@ CreateExecutorState(void)
 
        estate->es_top_eflags = 0;
        estate->es_instrument = 0;
+       estate->es_wait_event_usage = NULL;
        estate->es_finished = false;
 
        estate->es_exprcontexts = NIL;
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index 60d37ccbb73..eb01bc3d88c 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -37,7 +37,7 @@ static const char *pgstat_get_wait_ipc(WaitEventIPC w);
 static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
 static const char *pgstat_get_wait_io(WaitEventIO w);
 static void WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
-                                                         const instr_time 
*elapsed);
+                                                         uint64 calls, const 
instr_time *elapsed);
 
 
 static uint32 local_my_wait_event_info;
@@ -442,15 +442,31 @@ pgstat_count_wait_event_end(void)
 
        WaitEventUsageAdd(pgstat_wait_event_usage,
                                          pgstat_wait_event_usage_current,
+                                         1,
                                          &elapsed);
 
        pgstat_wait_event_usage_current = 0;
        INSTR_TIME_SET_ZERO(pgstat_wait_event_usage_start);
 }
 
+void
+pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
+                                                                  const 
WaitEventUsageEntry *entries,
+                                                                  int nentries)
+{
+       Assert(usage != NULL);
+       Assert(nentries == 0 || entries != NULL);
+
+       for (int i = 0; i < nentries; i++)
+               WaitEventUsageAdd(usage,
+                                                 entries[i].wait_event_info,
+                                                 entries[i].calls,
+                                                 &entries[i].time);
+}
+
 static void
 WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
-                                 const instr_time *elapsed)
+                                 uint64 calls, const instr_time *elapsed)
 {
        WaitEventUsageEntry *entry = NULL;
 
@@ -494,7 +510,7 @@ WaitEventUsageAdd(WaitEventUsage *usage, uint32 
wait_event_info,
                INSTR_TIME_SET_ZERO(entry->time);
        }
 
-       entry->calls++;
+       entry->calls += calls;
        INSTR_TIME_ADD(entry->time, *elapsed);
 }
 
diff --git a/src/include/executor/execParallel.h 
b/src/include/executor/execParallel.h
index 5a2034811d5..71df2c2511c 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -20,6 +20,7 @@
 #include "utils/dsa.h"
 
 typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
+typedef struct SharedWaitEventUsage SharedWaitEventUsage;
 
 typedef struct ParallelExecutorInfo
 {
@@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo
        ParallelContext *pcxt;          /* parallel context we're using */
        BufferUsage *buffer_usage;      /* points to bufusage area in DSM */
        WalUsage   *wal_usage;          /* walusage area in DSM */
+       SharedWaitEventUsage *wait_event_usage; /* optional */
        SharedExecutorInstrumentation *instrumentation; /* optional */
        struct SharedJitInstrumentation *jit_instrumentation;   /* optional */
        dsa_area   *area;                       /* points to DSA area in DSM */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 13359180d25..781c8c79132 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -69,6 +69,7 @@ typedef struct Tuplestorestate Tuplestorestate;
 typedef struct TupleTableSlot TupleTableSlot;
 typedef struct TupleTableSlotOps TupleTableSlotOps;
 typedef struct WalUsage WalUsage;
+typedef struct WaitEventUsage WaitEventUsage;
 typedef struct WorkerNodeInstrumentation WorkerNodeInstrumentation;
 
 
@@ -754,6 +755,7 @@ typedef struct EState
 
        int                     es_top_eflags;  /* eflags passed to 
ExecutorStart */
        int                     es_instrument;  /* OR of InstrumentOption flags 
*/
+       WaitEventUsage *es_wait_event_usage;    /* EXPLAIN WAITS accumulator */
        bool            es_finished;    /* true when ExecutorFinish is done */
 
        List       *es_exprcontexts;    /* List of ExprContexts within EState */
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index f7fab5736bb..63992137ee7 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -39,6 +39,9 @@ extern void pgstat_reset_wait_event_storage(void);
 extern void pgstat_begin_wait_event_usage(WaitEventUsage *usage,
                                                                                
  MemoryContext memcontext);
 extern void pgstat_end_wait_event_usage(WaitEventUsage *usage);
+extern void pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
+                                                                               
           const WaitEventUsageEntry *entries,
+                                                                               
           int nentries);
 extern void pgstat_count_wait_event_start(uint32 wait_event_info);
 extern void pgstat_count_wait_event_end(void);
 
diff --git a/src/test/regress/expected/explain.out 
b/src/test/regress/expected/explain.out
index 2c7a7e1d4c6..e3847e222be 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -114,6 +114,23 @@ select explain_filter_to_json('explain (analyze, waits, 
costs off, summary off,
  {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": 
"Timeout"}
 (1 row)
 
+begin;
+create function pg_temp.parallel_pg_sleep(float8) returns void
+  language internal volatile parallel safe as 'pg_sleep';
+set local debug_parallel_query = on;
+set local max_parallel_workers_per_gather = 1;
+select jsonb_path_query_first(
+  explain_filter_to_json('explain (analyze, waits, costs off, summary off, 
timing off, buffers off, format json)
+                         select pg_temp.parallel_pg_sleep(0.01)
+                         from tenk1 where unique1 = 1') #> '{0,Wait Events}',
+  '$[*] ? (@."Wait Event" == "PgSleep")'
+);
+                              jsonb_path_query_first                           
   
+----------------------------------------------------------------------------------
+ {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": 
"Timeout"}
+(1 row)
+
+rollback;
 explain (waits) select 1;
 ERROR:  EXPLAIN option WAITS requires ANALYZE
 \a
diff --git a/src/test/regress/sql/explain.sql b/src/test/regress/sql/explain.sql
index fe025ddeac5..8821250bcef 100644
--- a/src/test/regress/sql/explain.sql
+++ b/src/test/regress/sql/explain.sql
@@ -71,6 +71,18 @@ select explain_filter('explain (buffers, format text) select 
* from int8_tbl i8'
 -- WAITS option
 select explain_filter('explain (analyze, waits, costs off, summary off, timing 
off, buffers off) select pg_sleep(0.01)');
 select explain_filter_to_json('explain (analyze, waits, costs off, summary 
off, timing off, buffers off, format json) select pg_sleep(0.01)') #> '{0,Wait 
Events,0}';
+begin;
+create function pg_temp.parallel_pg_sleep(float8) returns void
+  language internal volatile parallel safe as 'pg_sleep';
+set local debug_parallel_query = on;
+set local max_parallel_workers_per_gather = 1;
+select jsonb_path_query_first(
+  explain_filter_to_json('explain (analyze, waits, costs off, summary off, 
timing off, buffers off, format json)
+                         select pg_temp.parallel_pg_sleep(0.01)
+                         from tenk1 where unique1 = 1') #> '{0,Wait Events}',
+  '$[*] ? (@."Wait Event" == "PgSleep")'
+);
+rollback;
 explain (waits) select 1;
 
 \a
-- 
2.52.0



Reply via email to