Hello,
I have implemented per query network stat collection for FDW. It is done
in a similar way to how buffer and WAL stats are collected and it can be
seen with a new NETWORK option for explain command:
explain (analyze, network) insert into itrtest values (2, 'blah');
QUERY PLAN
-----------------------------------------------------------------------------------------------
Insert on itrtest (cost=0.00..0.01 rows=0 width=0) (actual
time=0.544..0.544 rows=0 loops=1)
Network: FDW bytes sent=197 received=72, wait_time=0.689
-> Result (cost=0.00..0.01 rows=1 width=36) (actual
time=0.003..0.003 rows=1 loops=1)
Planning Time: 0.025 ms
Execution Time: 0.701 ms
(5 rows)
I am yet to add corresponding columns to pg_stat_statements, write tests
and documentation, but before I go ahead with that, I would like to know
what the community thinks about the patch.
Regards,
Ilya Gladyshev
>From 3ffbe071480672189c2e03d7e54707c77ba58b0b Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev <i.gladys...@postgrespro.ru>
Date: Mon, 23 Aug 2021 21:37:31 +0300
Subject: [PATCH] adds per query FDW network usage stats
Adds means for collecting network usage stats and outputting it in
explain with NETWORK option. Implements network stats collection for
postgres_fdw via adding a hook for stat collection to libpq.
---
contrib/postgres_fdw/connection.c | 57 ++++++++++++++++++++++++-
contrib/postgres_fdw/postgres_fdw.c | 28 +++++++++++++
src/backend/access/heap/vacuumlazy.c | 5 ++-
src/backend/access/nbtree/nbtsort.c | 14 ++++---
src/backend/commands/explain.c | 62 ++++++++++++++++++++++++++++
src/backend/executor/execParallel.c | 28 ++++++++++---
src/backend/executor/instrument.c | 57 +++++++++++++++++++++----
src/backend/utils/misc/guc.c | 10 ++++-
src/include/commands/explain.h | 1 +
src/include/executor/execParallel.h | 1 +
src/include/executor/instrument.h | 25 ++++++++---
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-misc.c | 2 +
src/interfaces/libpq/fe-secure.c | 4 ++
src/interfaces/libpq/libpq-fe.h | 5 ++-
15 files changed, 271 insertions(+), 29 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 82aa14a65de..3f479a74ba1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -571,10 +571,22 @@ void
do_sql_command(PGconn *conn, const char *sql)
{
PGresult *res;
+ instr_time start, duration;
+
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -684,10 +696,14 @@ GetPrepStmtNumber(PGconn *conn)
PGresult *
pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
{
+ PGresult *res;
+ instr_time start, duration;
/* First, process a pending asynchronous request, if any. */
if (state && state->pendingAreq)
process_pending_request(state->pendingAreq);
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -696,7 +712,14 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
pgfdw_report_error(ERROR, NULL, conn, false, query);
/* Wait for the result. */
- return pgfdw_get_result(conn, query);
+ res = pgfdw_get_result(conn, query);
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
+ return res;
}
/*
@@ -717,6 +740,10 @@ pgfdw_get_result(PGconn *conn, const char *query)
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
for (;;)
{
PGresult *res;
@@ -750,6 +777,13 @@ pgfdw_get_result(PGconn *conn, const char *query)
PQclear(last_res);
last_res = res;
}
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
}
PG_CATCH();
{
@@ -893,7 +927,18 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (entry->have_prep_stmt && entry->have_error)
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
res = PQexec(entry->conn, "DEALLOCATE ALL");
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
PQclear(res);
}
entry->have_prep_stmt = false;
@@ -1329,6 +1374,10 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
for (;;)
{
PGresult *res;
@@ -1377,6 +1426,12 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
last_res = res;
}
exit: ;
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
}
PG_CATCH();
{
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9d443baf02a..e511c4f8a73 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -60,6 +60,7 @@ PG_MODULE_MAGIC;
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+PQnetworkStats_hook_type prev_PQnetworkStats;
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -313,6 +314,14 @@ typedef struct
List *already_used; /* expressions already dealt with */
} ec_member_foreign_arg;
+void _PG_init(void);
+void _PG_fini(void);
+
+/*
+ * Hooks
+ */
+static void PostgresFdw_PQnetworkStats(ssize_t bytesReceived, ssize_t bytesSent);
+
/*
* SQL functions
*/
@@ -7438,3 +7447,22 @@ get_batch_size_option(Relation rel)
return batch_size;
}
+static void
+PostgresFdw_PQnetworkStats(ssize_t bytesSent, ssize_t bytesReceived)
+{
+ pgNetUsage.fdw_recv_bytes += bytesReceived;
+ pgNetUsage.fdw_sent_bytes += bytesSent;
+}
+
+void
+_PG_init(void)
+{
+ prev_PQnetworkStats = PQnetworkStats_hook;
+ PQnetworkStats_hook = PostgresFdw_PQnetworkStats;
+}
+
+void
+_PG_fini(void)
+{
+ PQnetworkStats_hook = prev_PQnetworkStats;
+}
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 334d8a2aa71..a94d61f1ec4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2800,7 +2800,7 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
WaitForParallelWorkersToFinish(lps->pcxt);
for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
+ InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i], NULL);
}
/*
@@ -4243,7 +4243,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
+ &wal_usage[ParallelWorkerNumber],
+ NULL);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 54c8eb1289d..055fbd86e44 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1663,11 +1663,11 @@ _bt_end_parallel(BTLeader *btleader)
WaitForParallelWorkersToFinish(btleader->pcxt);
/*
- * Next, accumulate WAL usage. (This must wait for the workers to finish,
- * or we might get incomplete data.)
+ * Next, accumulate WAL and buffer usage. (This must wait for the workers
+ * to finish, or we might get incomplete data.)
*/
for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]);
+ InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i], NULL);
/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(btleader->snapshot))
@@ -1870,11 +1870,15 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
sharedsort2, sortmem, false);
- /* Report WAL/buffer usage during parallel execution */
+ /*
+ * Report WAL/buffer usage during parallel execution. No need to report
+ * network
+ */
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
- &walusage[ParallelWorkerNumber]);
+ &walusage[ParallelWorkerNumber],
+ NULL);
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 10644dfac44..617cd28acd3 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -122,6 +122,7 @@ static const char *explain_get_index_name(Oid indexId);
static void show_buffer_usage(ExplainState *es, const BufferUsage *usage,
bool planning);
static void show_wal_usage(ExplainState *es, const WalUsage *usage);
+static void show_net_usage(ExplainState *es, const NetworkUsage * usage);
static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir,
ExplainState *es);
static void ExplainScanTarget(Scan *plan, ExplainState *es);
@@ -188,6 +189,8 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
es->buffers = defGetBoolean(opt);
else if (strcmp(opt->defname, "wal") == 0)
es->wal = defGetBoolean(opt);
+ else if (strcmp(opt->defname, "network") == 0)
+ es->network = defGetBoolean(opt);
else if (strcmp(opt->defname, "settings") == 0)
es->settings = defGetBoolean(opt);
else if (strcmp(opt->defname, "timing") == 0)
@@ -232,6 +235,12 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("EXPLAIN option WAL requires ANALYZE")));
+ if (es->network && !es->analyze)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("EXPLAIN option NETWORK requires ANALYZE")));
+
+
/* if the timing was not set explicitly, set default value */
es->timing = (timing_set) ? es->timing : es->analyze;
@@ -538,6 +547,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
instrument_option |= INSTRUMENT_BUFFERS;
if (es->wal)
instrument_option |= INSTRUMENT_WAL;
+ if (es->network)
+ instrument_option |= INSTRUMENT_NETWORK;
/*
* We always collect timing for the entire statement, even when node-level
@@ -2048,6 +2059,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
show_buffer_usage(es, &planstate->instrument->bufusage, false);
if (es->wal && planstate->instrument)
show_wal_usage(es, &planstate->instrument->walusage);
+ if (es->network && planstate->instrument)
+ show_net_usage(es, &planstate->instrument->netusage);
/* Prepare per-worker buffer/WAL usage */
if (es->workers_state && (es->buffers || es->wal) && es->verbose)
@@ -3617,6 +3630,55 @@ show_buffer_usage(ExplainState *es, const BufferUsage *usage, bool planning)
}
}
+/*
+ * Show network usage details.
+ */
+static void
+show_net_usage(ExplainState *es, const NetworkUsage * usage)
+{
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ bool has_data = (usage->fdw_recv_bytes > 0) || (usage->fdw_sent_bytes > 0);
+ bool has_timing = !INSTR_TIME_IS_ZERO(usage->fdw_wait_time);
+
+ /* Show only positive counter values. */
+ if (has_data)
+ {
+ ExplainIndentText(es);
+ appendStringInfoString(es->str, "Network: FDW bytes");
+
+ if (usage->fdw_sent_bytes > 0)
+ appendStringInfo(es->str, " sent=" UINT64_FORMAT,
+ usage->fdw_sent_bytes);
+ if (usage->fdw_recv_bytes > 0)
+ appendStringInfo(es->str, " received=" UINT64_FORMAT,
+ usage->fdw_recv_bytes);
+ }
+ if (has_timing)
+ {
+ appendStringInfo(es->str, ", wait_time=%0.3f",
+ INSTR_TIME_GET_MILLISEC(usage->fdw_wait_time));
+ }
+ if (has_data || has_timing)
+ {
+ appendStringInfoChar(es->str, '\n');
+ }
+ }
+ else
+ {
+ ExplainPropertyInteger("FDW Bytes Sent", NULL,
+ usage->fdw_sent_bytes, es);
+ ExplainPropertyInteger("FDW Bytes Received", NULL,
+ usage->fdw_recv_bytes, es);
+ if (track_fdw_wait_timing)
+ {
+ ExplainPropertyFloat("FDW Wait Time", NULL,
+ INSTR_TIME_GET_MILLISEC(usage->fdw_wait_time),
+ 3, es);
+ }
+ }
+}
+
/*
* Show WAL usage details.
*/
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f8a4a40e7b5..4b9603113ca 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,7 +12,7 @@
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer/WAL usage info, and
+ * any ParamListInfo associated with the query, buffer/WAL/network usage info, and
* the actual plan to be passed down to the worker.
*
* IDENTIFICATION
@@ -66,6 +66,7 @@
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_NETWORK_USAGE UINT64CONST(0xE00000000000000B)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -599,6 +600,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *paramlistinfo_space;
BufferUsage *bufusage_space;
WalUsage *walusage_space;
+ NetworkUsage *netusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
@@ -679,6 +681,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Same for network */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(NetworkUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -767,6 +774,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
pei->wal_usage = walusage_space;
+ /* Same for network */
+ netusage_space = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(NetworkUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_NETWORK_USAGE, netusage_space);
+ pei->net_usage = netusage_space;
+
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
@@ -1159,11 +1172,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
WaitForParallelWorkersToFinish(pei->pcxt);
/*
- * Next, accumulate buffer/WAL usage. (This must wait for the workers to
- * finish, or we might get incomplete data.)
+ * Next, accumulate buffer/WAL/network usage. (This must wait for the
+ * workers to finish, or we might get incomplete data.)
*/
for (i = 0; i < nworkers; i++)
- InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
+ InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i], &pei->net_usage[i]);
pei->finished = true;
}
@@ -1396,6 +1409,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
+ NetworkUsage *net_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
@@ -1469,11 +1483,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Shut down the executor */
ExecutorFinish(queryDesc);
- /* Report buffer/WAL usage during parallel execution. */
+ /* Report buffer/WAL/network usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ net_usage = shm_toc_lookup(toc, PARALLEL_KEY_NETWORK_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
+ &wal_usage[ParallelWorkerNumber],
+ &net_usage[ParallelWorkerNumber]);
/* Report instrumentation data if any instrumentation options are set. */
if (instrumentation != NULL)
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 2b106d8473c..31e897a86fe 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -21,10 +21,14 @@ BufferUsage pgBufferUsage;
static BufferUsage save_pgBufferUsage;
WalUsage pgWalUsage;
static WalUsage save_pgWalUsage;
+NetworkUsage pgNetUsage;
+static NetworkUsage save_pgNetUsage;
+
+bool track_fdw_wait_timing;
static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
static void WalUsageAdd(WalUsage *dst, WalUsage *add);
-
+static void NetUsageAdd(NetworkUsage *dst, const NetworkUsage *add);
/* Allocate new instrumentation structure(s) */
Instrumentation *
@@ -34,11 +38,13 @@ InstrAlloc(int n, int instrument_options, bool async_mode)
/* initialize all fields to zeroes, then modify as needed */
instr = palloc0(n * sizeof(Instrumentation));
- if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL))
+ if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER |
+ INSTRUMENT_WAL | INSTRUMENT_NETWORK))
{
bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+ bool need_network = (instrument_options & INSTRUMENT_NETWORK) != 0;
int i;
for (i = 0; i < n; i++)
@@ -46,6 +52,7 @@ InstrAlloc(int n, int instrument_options, bool async_mode)
instr[i].need_bufusage = need_buffers;
instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
+ instr[i].need_netusage = need_network;
instr[i].async_mode = async_mode;
}
}
@@ -61,6 +68,7 @@ InstrInit(Instrumentation *instr, int instrument_options)
instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+ instr->need_netusage = (instrument_options & INSTRUMENT_NETWORK) != 0;
}
/* Entry to a plan node */
@@ -77,6 +85,9 @@ InstrStartNode(Instrumentation *instr)
if (instr->need_walusage)
instr->walusage_start = pgWalUsage;
+
+ if (instr->need_netusage)
+ instr->netusage_start = pgNetUsage;
}
/* Exit from a plan node */
@@ -103,12 +114,14 @@ InstrStopNode(Instrumentation *instr, double nTuples)
/* Add delta of buffer usage since entry to node's totals */
if (instr->need_bufusage)
- BufferUsageAccumDiff(&instr->bufusage,
- &pgBufferUsage, &instr->bufusage_start);
+ BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage,
+ &instr->bufusage_start);
if (instr->need_walusage)
- WalUsageAccumDiff(&instr->walusage,
- &pgWalUsage, &instr->walusage_start);
+ WalUsageAccumDiff(&instr->walusage, &pgWalUsage, &instr->walusage_start);
+
+ if (instr->need_netusage)
+ NetUsageAccumDiff(&instr->netusage, &pgNetUsage, &instr->netusage_start);
/* Is this the first tuple of this cycle? */
if (!instr->running)
@@ -193,6 +206,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add)
if (dst->need_walusage)
WalUsageAdd(&dst->walusage, &add->walusage);
+
+ if (dst->need_netusage)
+ NetUsageAdd(&dst->netusage, &add->netusage);
}
/* note current values during parallel executor startup */
@@ -201,24 +217,32 @@ InstrStartParallelQuery(void)
{
save_pgBufferUsage = pgBufferUsage;
save_pgWalUsage = pgWalUsage;
+ save_pgNetUsage = pgNetUsage;
}
/* report usage after parallel executor shutdown */
void
-InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
+InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage)
{
memset(bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
memset(walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
+ if (netusage != NULL)
+ {
+ memset(netusage, 0, sizeof(NetworkUsage));
+ NetUsageAccumDiff(netusage, &pgNetUsage, &save_pgNetUsage);
+ }
}
/* accumulate work done by workers in leader's stats */
void
-InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
+InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage)
{
BufferUsageAdd(&pgBufferUsage, bufusage);
WalUsageAdd(&pgWalUsage, walusage);
+ if (netusage != NULL)
+ NetUsageAdd(&pgNetUsage, netusage);
}
/* dst += add */
@@ -277,3 +301,20 @@ WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
dst->wal_records += add->wal_records - sub->wal_records;
dst->wal_fpi += add->wal_fpi - sub->wal_fpi;
}
+
+void
+NetUsageAccumDiff(NetworkUsage * dst, const NetworkUsage * add,
+ const NetworkUsage * sub)
+{
+ dst->fdw_recv_bytes += add->fdw_recv_bytes - sub->fdw_recv_bytes;
+ dst->fdw_sent_bytes += add->fdw_sent_bytes - sub->fdw_sent_bytes;
+ INSTR_TIME_ACCUM_DIFF(dst->fdw_wait_time, add->fdw_wait_time, sub->fdw_wait_time);
+}
+
+static void
+NetUsageAdd(NetworkUsage * dst, const NetworkUsage * add)
+{
+ dst->fdw_recv_bytes += add->fdw_recv_bytes;
+ dst->fdw_sent_bytes += add->fdw_sent_bytes;
+ INSTR_TIME_ADD(dst->fdw_wait_time, add->fdw_wait_time);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 467b0fd6fe7..75d4f574951 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1552,7 +1552,15 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
-
+ {
+ {"track_fdw_wait_timing", PGC_SUSET, STATS_COLLECTOR,
+ gettext_noop("Collects statistics for foreign source waiting time."),
+ NULL
+ },
+ &track_fdw_wait_timing,
+ false,
+ NULL, NULL, NULL
+ },
{
{"update_process_title", PGC_SUSET, PROCESS_TITLE,
gettext_noop("Updates the process title to show the active SQL command."),
diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h
index e94d9e49cf6..a212d87f037 100644
--- a/src/include/commands/explain.h
+++ b/src/include/commands/explain.h
@@ -43,6 +43,7 @@ typedef struct ExplainState
bool costs; /* print estimated costs */
bool buffers; /* print buffer usage */
bool wal; /* print WAL usage */
+ bool network; /* print network usage */
bool timing; /* print detailed node timing */
bool summary; /* print total planning and execution timing */
bool settings; /* print modified settings */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 3888175a2f4..22118628a51 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
WalUsage *wal_usage; /* walusage area in DSM */
+ NetworkUsage *net_usage; /* netusage area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 2f9905b7c8e..29c8a7c6c39 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -51,6 +51,13 @@ typedef struct WalUsage
uint64 wal_bytes; /* size of WAL records produced */
} WalUsage;
+typedef struct NetworkUsage
+{
+ uint64 fdw_recv_bytes; /* Bytes received from foreign source */
+ uint64 fdw_sent_bytes; /* Bytes sent to foreign targets */
+ instr_time fdw_wait_time; /* Time spent waiting for response from foreign source. */
+} NetworkUsage;
+
/* Flag bits included in InstrAlloc's instrument_options bitmask */
typedef enum InstrumentOption
{
@@ -58,6 +65,7 @@ typedef enum InstrumentOption
INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */
INSTRUMENT_ROWS = 1 << 2, /* needs row count */
INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */
+ INSTRUMENT_NETWORK = 1 << 4, /* needs network usage */
INSTRUMENT_ALL = PG_INT32_MAX
} InstrumentOption;
@@ -67,6 +75,7 @@ typedef struct Instrumentation
bool need_timer; /* true if we need timer data */
bool need_bufusage; /* true if we need buffer usage data */
bool need_walusage; /* true if we need WAL usage data */
+ bool need_netusage; /* true if we need network usage data */
bool async_mode; /* true if node is in async mode */
/* Info about current plan cycle: */
bool running; /* true if we've completed first tuple */
@@ -76,6 +85,7 @@ typedef struct Instrumentation
double tuplecount; /* # of tuples emitted so far this cycle */
BufferUsage bufusage_start; /* buffer usage at start */
WalUsage walusage_start; /* WAL usage at start */
+ NetworkUsage netusage_start; /* network usage at start */
/* Accumulated statistics across all completed cycles: */
double startup; /* total startup time (in seconds) */
double total; /* total time (in seconds) */
@@ -86,6 +96,7 @@ typedef struct Instrumentation
double nfiltered2; /* # of tuples removed by "other" quals */
BufferUsage bufusage; /* total buffer usage */
WalUsage walusage; /* total WAL usage */
+ NetworkUsage netusage; /* total network usage */
} Instrumentation;
typedef struct WorkerInstrumentation
@@ -96,6 +107,9 @@ typedef struct WorkerInstrumentation
extern PGDLLIMPORT BufferUsage pgBufferUsage;
extern PGDLLIMPORT WalUsage pgWalUsage;
+extern PGDLLIMPORT NetworkUsage pgNetUsage;
+
+extern PGDLLIMPORT bool track_fdw_wait_timing;
extern Instrumentation *InstrAlloc(int n, int instrument_options,
bool async_mode);
@@ -106,11 +120,12 @@ extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
extern void InstrEndLoop(Instrumentation *instr);
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
extern void InstrStartParallelQuery(void);
-extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
-extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
-extern void BufferUsageAccumDiff(BufferUsage *dst,
- const BufferUsage *add, const BufferUsage *sub);
+extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage);
+extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage);
+extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add,
+ const BufferUsage *sub);
extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,
const WalUsage *sub);
-
+extern void NetUsageAccumDiff(NetworkUsage *dst, const NetworkUsage *add,
+ const NetworkUsage *sub);
#endif /* INSTRUMENT_H */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc883709..6663b28eb8d 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus 183
PQsetTraceFlags 184
PQmblenBounded 185
PQsendFlushRequest 186
+PQnetworkStats_hook 187
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 9a2a9702934..067c1aa51d7 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,6 +53,8 @@
#include "pg_config_paths.h"
#include "port/pg_bswap.h"
+PQnetworkStats_hook_type PQnetworkStats_hook = NULL;
+
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c
index b15d8d137ce..c3653482e5e 100644
--- a/src/interfaces/libpq/fe-secure.c
+++ b/src/interfaces/libpq/fe-secure.c
@@ -224,6 +224,8 @@ pqsecure_read(PGconn *conn, void *ptr, size_t len)
{
n = pqsecure_raw_read(conn, ptr, len);
}
+ if (PQnetworkStats_hook)
+ PQnetworkStats_hook(0, n);
return n;
}
@@ -307,6 +309,8 @@ pqsecure_write(PGconn *conn, const void *ptr, size_t len)
n = pqsecure_raw_write(conn, ptr, len);
}
+ if (PQnetworkStats_hook)
+ PQnetworkStats_hook(n, 0);
return n;
}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index a6fd69acebc..552a61b3c27 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -27,7 +27,7 @@ extern "C"
* such as Oid.
*/
#include "postgres_ext.h"
-
+#include "c.h"
/*
* These symbols may be used in compile-time #ifdef tests for the availability
* of newer libpq features.
@@ -646,6 +646,9 @@ extern int PQdsplen(const char *s, int encoding);
/* Get encoding id from environment variable PGCLIENTENCODING */
extern int PQenv2encoding(void);
+typedef void (*PQnetworkStats_hook_type) (ssize_t bytesSent, ssize_t bytesReceived);
+extern PGDLLEXPORT PQnetworkStats_hook_type PQnetworkStats_hook;
+
/* === in fe-auth.c === */
extern char *PQencryptPassword(const char *passwd, const char *user);
--
2.30.2