Hello,

I have implemented per query network stat collection for FDW. It is done in a similar way to how buffer and WAL stats are collected and it can be seen with a new NETWORK option for explain command:

explain (analyze, network) insert into itrtest values (2, 'blah');

                                          QUERY PLAN
-----------------------------------------------------------------------------------------------
 Insert on itrtest  (cost=0.00..0.01 rows=0 width=0) (actual time=0.544..0.544 rows=0 loops=1)
   Network: FDW bytes sent=197 received=72, wait_time=0.689
   ->  Result  (cost=0.00..0.01 rows=1 width=36) (actual time=0.003..0.003 rows=1 loops=1)
 Planning Time: 0.025 ms
 Execution Time: 0.701 ms
(5 rows)

I am yet to add corresponding columns to pg_stat_statements, write tests and documentation, but before I go ahead with that, I would like to know what the community thinks about the patch.

Regards,

Ilya Gladyshev


>From 3ffbe071480672189c2e03d7e54707c77ba58b0b Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev <i.gladys...@postgrespro.ru>
Date: Mon, 23 Aug 2021 21:37:31 +0300
Subject: [PATCH] adds per query FDW network usage stats

Adds means for collecting network usage stats and outputting it in
explain with NETWORK option. Implements network stats collection for
postgres_fdw via adding a hook for stat collection to libpq.
---
 contrib/postgres_fdw/connection.c    | 57 ++++++++++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.c  | 28 +++++++++++++
 src/backend/access/heap/vacuumlazy.c |  5 ++-
 src/backend/access/nbtree/nbtsort.c  | 14 ++++---
 src/backend/commands/explain.c       | 62 ++++++++++++++++++++++++++++
 src/backend/executor/execParallel.c  | 28 ++++++++++---
 src/backend/executor/instrument.c    | 57 +++++++++++++++++++++----
 src/backend/utils/misc/guc.c         | 10 ++++-
 src/include/commands/explain.h       |  1 +
 src/include/executor/execParallel.h  |  1 +
 src/include/executor/instrument.h    | 25 ++++++++---
 src/interfaces/libpq/exports.txt     |  1 +
 src/interfaces/libpq/fe-misc.c       |  2 +
 src/interfaces/libpq/fe-secure.c     |  4 ++
 src/interfaces/libpq/libpq-fe.h      |  5 ++-
 15 files changed, 271 insertions(+), 29 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 82aa14a65de..3f479a74ba1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -571,10 +571,22 @@ void
 do_sql_command(PGconn *conn, const char *sql)
 {
 	PGresult   *res;
+	instr_time start, duration;
+
+	if (track_fdw_wait_timing)
+		INSTR_TIME_SET_CURRENT(start);
 
 	if (!PQsendQuery(conn, sql))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
 	res = pgfdw_get_result(conn, sql);
+
+	if (track_fdw_wait_timing)
+	{
+		INSTR_TIME_SET_CURRENT(duration);
+		INSTR_TIME_SUBTRACT(duration, start);
+		INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+	}
+
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -684,10 +696,14 @@ GetPrepStmtNumber(PGconn *conn)
 PGresult *
 pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 {
+	PGresult *res;
+	instr_time start, duration;
 	/* First, process a pending asynchronous request, if any. */
 	if (state && state->pendingAreq)
 		process_pending_request(state->pendingAreq);
 
+	if (track_fdw_wait_timing)
+		INSTR_TIME_SET_CURRENT(start);
 	/*
 	 * Submit a query.  Since we don't use non-blocking mode, this also can
 	 * block.  But its risk is relatively small, so we ignore that for now.
@@ -696,7 +712,14 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 		pgfdw_report_error(ERROR, NULL, conn, false, query);
 
 	/* Wait for the result. */
-	return pgfdw_get_result(conn, query);
+	res = pgfdw_get_result(conn, query);
+	if (track_fdw_wait_timing)
+	{
+		INSTR_TIME_SET_CURRENT(duration);
+		INSTR_TIME_SUBTRACT(duration, start);
+		INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+	}
+	return res;
 }
 
 /*
@@ -717,6 +740,10 @@ pgfdw_get_result(PGconn *conn, const char *query)
 	/* In what follows, do not leak any PGresults on an error. */
 	PG_TRY();
 	{
+		instr_time start, duration;
+		if (track_fdw_wait_timing)
+			INSTR_TIME_SET_CURRENT(start);
+
 		for (;;)
 		{
 			PGresult   *res;
@@ -750,6 +777,13 @@ pgfdw_get_result(PGconn *conn, const char *query)
 			PQclear(last_res);
 			last_res = res;
 		}
+
+		if (track_fdw_wait_timing)
+		{
+			INSTR_TIME_SET_CURRENT(duration);
+			INSTR_TIME_SUBTRACT(duration, start);
+			INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+		}
 	}
 	PG_CATCH();
 	{
@@ -893,7 +927,18 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
+						instr_time start, duration;
+						if (track_fdw_wait_timing)
+							INSTR_TIME_SET_CURRENT(start);
+
 						res = PQexec(entry->conn, "DEALLOCATE ALL");
+
+						if (track_fdw_wait_timing)
+						{
+							INSTR_TIME_SET_CURRENT(duration);
+							INSTR_TIME_SUBTRACT(duration, start);
+							INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+						}
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -1329,6 +1374,10 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
 	/* In what follows, do not leak any PGresults on an error. */
 	PG_TRY();
 	{
+		instr_time start, duration;
+		if (track_fdw_wait_timing)
+			INSTR_TIME_SET_CURRENT(start);
+
 		for (;;)
 		{
 			PGresult   *res;
@@ -1377,6 +1426,12 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
 			last_res = res;
 		}
 exit:	;
+		if (track_fdw_wait_timing)
+		{
+			INSTR_TIME_SET_CURRENT(duration);
+			INSTR_TIME_SUBTRACT(duration, start);
+			INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+		}
 	}
 	PG_CATCH();
 	{
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9d443baf02a..e511c4f8a73 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -60,6 +60,7 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+PQnetworkStats_hook_type prev_PQnetworkStats;
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -313,6 +314,14 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
 
+void		_PG_init(void);
+void _PG_fini(void);
+
+/*
+ * Hooks
+ */
+static void PostgresFdw_PQnetworkStats(ssize_t bytesReceived, ssize_t bytesSent);
+
 /*
  * SQL functions
  */
@@ -7438,3 +7447,22 @@ get_batch_size_option(Relation rel)
 
 	return batch_size;
 }
+static void
+PostgresFdw_PQnetworkStats(ssize_t bytesSent, ssize_t bytesReceived)
+{
+	pgNetUsage.fdw_recv_bytes += bytesReceived;
+	pgNetUsage.fdw_sent_bytes += bytesSent;
+}
+
+void
+_PG_init(void)
+{
+	prev_PQnetworkStats = PQnetworkStats_hook;
+	PQnetworkStats_hook = PostgresFdw_PQnetworkStats;
+}
+
+void
+_PG_fini(void)
+{
+	PQnetworkStats_hook = prev_PQnetworkStats;
+}
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 334d8a2aa71..a94d61f1ec4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2800,7 +2800,7 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
 		WaitForParallelWorkersToFinish(lps->pcxt);
 
 		for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
-			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
+			InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i], NULL);
 	}
 
 	/*
@@ -4243,7 +4243,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
 	wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
-						  &wal_usage[ParallelWorkerNumber]);
+						  &wal_usage[ParallelWorkerNumber],
+						  NULL);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 54c8eb1289d..055fbd86e44 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1663,11 +1663,11 @@ _bt_end_parallel(BTLeader *btleader)
 	WaitForParallelWorkersToFinish(btleader->pcxt);
 
 	/*
-	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
-	 * or we might get incomplete data.)
+	 * Next, accumulate WAL and buffer usage.  (This must wait for the workers
+	 * to finish, or we might get incomplete data.)
 	 */
 	for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
-		InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]);
+		InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i], NULL);
 
 	/* Free last reference to MVCC snapshot, if one was used */
 	if (IsMVCCSnapshot(btleader->snapshot))
@@ -1870,11 +1870,15 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 	_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
 							   sharedsort2, sortmem, false);
 
-	/* Report WAL/buffer usage during parallel execution */
+	/*
+	 * Report WAL/buffer usage during parallel execution. No need to report
+	 * network
+	 */
 	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
 	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
 	InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
-						  &walusage[ParallelWorkerNumber]);
+						  &walusage[ParallelWorkerNumber],
+						  NULL);
 
 #ifdef BTREE_BUILD_STATS
 	if (log_btree_build_stats)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 10644dfac44..617cd28acd3 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -122,6 +122,7 @@ static const char *explain_get_index_name(Oid indexId);
 static void show_buffer_usage(ExplainState *es, const BufferUsage *usage,
 							  bool planning);
 static void show_wal_usage(ExplainState *es, const WalUsage *usage);
+static void show_net_usage(ExplainState *es, const NetworkUsage * usage);
 static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir,
 									ExplainState *es);
 static void ExplainScanTarget(Scan *plan, ExplainState *es);
@@ -188,6 +189,8 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
 			es->buffers = defGetBoolean(opt);
 		else if (strcmp(opt->defname, "wal") == 0)
 			es->wal = defGetBoolean(opt);
+		else if (strcmp(opt->defname, "network") == 0)
+			es->network = defGetBoolean(opt);
 		else if (strcmp(opt->defname, "settings") == 0)
 			es->settings = defGetBoolean(opt);
 		else if (strcmp(opt->defname, "timing") == 0)
@@ -232,6 +235,12 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("EXPLAIN option WAL requires ANALYZE")));
 
+	if (es->network && !es->analyze)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("EXPLAIN option NETWORK requires ANALYZE")));
+
+
 	/* if the timing was not set explicitly, set default value */
 	es->timing = (timing_set) ? es->timing : es->analyze;
 
@@ -538,6 +547,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 		instrument_option |= INSTRUMENT_BUFFERS;
 	if (es->wal)
 		instrument_option |= INSTRUMENT_WAL;
+	if (es->network)
+		instrument_option |= INSTRUMENT_NETWORK;
 
 	/*
 	 * We always collect timing for the entire statement, even when node-level
@@ -2048,6 +2059,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		show_buffer_usage(es, &planstate->instrument->bufusage, false);
 	if (es->wal && planstate->instrument)
 		show_wal_usage(es, &planstate->instrument->walusage);
+	if (es->network && planstate->instrument)
+		show_net_usage(es, &planstate->instrument->netusage);
 
 	/* Prepare per-worker buffer/WAL usage */
 	if (es->workers_state && (es->buffers || es->wal) && es->verbose)
@@ -3617,6 +3630,55 @@ show_buffer_usage(ExplainState *es, const BufferUsage *usage, bool planning)
 	}
 }
 
+/*
+ * Show network usage details.
+ */
+static void
+show_net_usage(ExplainState *es, const NetworkUsage * usage)
+{
+	if (es->format == EXPLAIN_FORMAT_TEXT)
+	{
+		bool		has_data = (usage->fdw_recv_bytes > 0) || (usage->fdw_sent_bytes > 0);
+		bool		has_timing = !INSTR_TIME_IS_ZERO(usage->fdw_wait_time);
+
+		/* Show only positive counter values. */
+		if (has_data)
+		{
+			ExplainIndentText(es);
+			appendStringInfoString(es->str, "Network: FDW bytes");
+
+			if (usage->fdw_sent_bytes > 0)
+				appendStringInfo(es->str, " sent=" UINT64_FORMAT,
+								 usage->fdw_sent_bytes);
+			if (usage->fdw_recv_bytes > 0)
+				appendStringInfo(es->str, " received=" UINT64_FORMAT,
+								 usage->fdw_recv_bytes);
+		}
+		if (has_timing)
+		{
+			appendStringInfo(es->str, ", wait_time=%0.3f",
+							 INSTR_TIME_GET_MILLISEC(usage->fdw_wait_time));
+		}
+		if (has_data || has_timing)
+		{
+			appendStringInfoChar(es->str, '\n');
+		}
+	}
+	else
+	{
+		ExplainPropertyInteger("FDW Bytes Sent", NULL,
+							   usage->fdw_sent_bytes, es);
+		ExplainPropertyInteger("FDW Bytes Received", NULL,
+							   usage->fdw_recv_bytes, es);
+		if (track_fdw_wait_timing)
+		{
+			ExplainPropertyFloat("FDW Wait Time", NULL,
+								 INSTR_TIME_GET_MILLISEC(usage->fdw_wait_time),
+								 3, es);
+		}
+	}
+}
+
 /*
  * Show WAL usage details.
  */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f8a4a40e7b5..4b9603113ca 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,7 +12,7 @@
  * workers and ensuring that their state generally matches that of the
  * leader; see src/backend/access/transam/README.parallel for details.
  * However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer/WAL usage info, and
+ * any ParamListInfo associated with the query, buffer/WAL/network usage info, and
  * the actual plan to be passed down to the worker.
  *
  * IDENTIFICATION
@@ -66,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_NETWORK_USAGE UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -599,6 +600,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	char	   *paramlistinfo_space;
 	BufferUsage *bufusage_space;
 	WalUsage   *walusage_space;
+	NetworkUsage *netusage_space;
 	SharedExecutorInstrumentation *instrumentation = NULL;
 	SharedJitInstrumentation *jit_instrumentation = NULL;
 	int			pstmt_len;
@@ -679,6 +681,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 						   mul_size(sizeof(WalUsage), pcxt->nworkers));
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/* Same for network */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(NetworkUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/* Estimate space for tuple queues. */
 	shm_toc_estimate_chunk(&pcxt->estimator,
 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -767,6 +774,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
 	pei->wal_usage = walusage_space;
 
+	/* Same for network */
+	netusage_space = shm_toc_allocate(pcxt->toc,
+									  mul_size(sizeof(NetworkUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_NETWORK_USAGE, netusage_space);
+	pei->net_usage = netusage_space;
+
 	/* Set up the tuple queues that the workers will write into. */
 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
@@ -1159,11 +1172,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 	WaitForParallelWorkersToFinish(pei->pcxt);
 
 	/*
-	 * Next, accumulate buffer/WAL usage.  (This must wait for the workers to
-	 * finish, or we might get incomplete data.)
+	 * Next, accumulate buffer/WAL/network usage.  (This must wait for the
+	 * workers to finish, or we might get incomplete data.)
 	 */
 	for (i = 0; i < nworkers; i++)
-		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
+		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i], &pei->net_usage[i]);
 
 	pei->finished = true;
 }
@@ -1396,6 +1409,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
+	NetworkUsage *net_usage;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
 	SharedExecutorInstrumentation *instrumentation;
@@ -1469,11 +1483,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	/* Shut down the executor */
 	ExecutorFinish(queryDesc);
 
-	/* Report buffer/WAL usage during parallel execution. */
+	/* Report buffer/WAL/network usage during parallel execution. */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
 	wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	net_usage = shm_toc_lookup(toc, PARALLEL_KEY_NETWORK_USAGE, false);
 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
-						  &wal_usage[ParallelWorkerNumber]);
+						  &wal_usage[ParallelWorkerNumber],
+						  &net_usage[ParallelWorkerNumber]);
 
 	/* Report instrumentation data if any instrumentation options are set. */
 	if (instrumentation != NULL)
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 2b106d8473c..31e897a86fe 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -21,10 +21,14 @@ BufferUsage pgBufferUsage;
 static BufferUsage save_pgBufferUsage;
 WalUsage	pgWalUsage;
 static WalUsage save_pgWalUsage;
+NetworkUsage pgNetUsage;
+static NetworkUsage save_pgNetUsage;
+
+bool track_fdw_wait_timing;
 
 static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
 static void WalUsageAdd(WalUsage *dst, WalUsage *add);
-
+static void NetUsageAdd(NetworkUsage *dst, const NetworkUsage *add);
 
 /* Allocate new instrumentation structure(s) */
 Instrumentation *
@@ -34,11 +38,13 @@ InstrAlloc(int n, int instrument_options, bool async_mode)
 
 	/* initialize all fields to zeroes, then modify as needed */
 	instr = palloc0(n * sizeof(Instrumentation));
-	if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL))
+	if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER |
+							  INSTRUMENT_WAL | INSTRUMENT_NETWORK))
 	{
 		bool		need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
 		bool		need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
 		bool		need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+		bool		need_network = (instrument_options & INSTRUMENT_NETWORK) != 0;
 		int			i;
 
 		for (i = 0; i < n; i++)
@@ -46,6 +52,7 @@ InstrAlloc(int n, int instrument_options, bool async_mode)
 			instr[i].need_bufusage = need_buffers;
 			instr[i].need_walusage = need_wal;
 			instr[i].need_timer = need_timer;
+			instr[i].need_netusage = need_network;
 			instr[i].async_mode = async_mode;
 		}
 	}
@@ -61,6 +68,7 @@ InstrInit(Instrumentation *instr, int instrument_options)
 	instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
 	instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
 	instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+	instr->need_netusage = (instrument_options & INSTRUMENT_NETWORK) != 0;
 }
 
 /* Entry to a plan node */
@@ -77,6 +85,9 @@ InstrStartNode(Instrumentation *instr)
 
 	if (instr->need_walusage)
 		instr->walusage_start = pgWalUsage;
+
+	if (instr->need_netusage)
+		instr->netusage_start = pgNetUsage;
 }
 
 /* Exit from a plan node */
@@ -103,12 +114,14 @@ InstrStopNode(Instrumentation *instr, double nTuples)
 
 	/* Add delta of buffer usage since entry to node's totals */
 	if (instr->need_bufusage)
-		BufferUsageAccumDiff(&instr->bufusage,
-							 &pgBufferUsage, &instr->bufusage_start);
+		BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage,
+							 &instr->bufusage_start);
 
 	if (instr->need_walusage)
-		WalUsageAccumDiff(&instr->walusage,
-						  &pgWalUsage, &instr->walusage_start);
+		WalUsageAccumDiff(&instr->walusage, &pgWalUsage, &instr->walusage_start);
+
+	if (instr->need_netusage)
+		NetUsageAccumDiff(&instr->netusage, &pgNetUsage, &instr->netusage_start);
 
 	/* Is this the first tuple of this cycle? */
 	if (!instr->running)
@@ -193,6 +206,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add)
 
 	if (dst->need_walusage)
 		WalUsageAdd(&dst->walusage, &add->walusage);
+
+	if (dst->need_netusage)
+		NetUsageAdd(&dst->netusage, &add->netusage);
 }
 
 /* note current values during parallel executor startup */
@@ -201,24 +217,32 @@ InstrStartParallelQuery(void)
 {
 	save_pgBufferUsage = pgBufferUsage;
 	save_pgWalUsage = pgWalUsage;
+	save_pgNetUsage = pgNetUsage;
 }
 
 /* report usage after parallel executor shutdown */
 void
-InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
+InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage)
 {
 	memset(bufusage, 0, sizeof(BufferUsage));
 	BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
 	memset(walusage, 0, sizeof(WalUsage));
 	WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
+	if (netusage != NULL)
+	{
+		memset(netusage, 0, sizeof(NetworkUsage));
+		NetUsageAccumDiff(netusage, &pgNetUsage, &save_pgNetUsage);
+	}
 }
 
 /* accumulate work done by workers in leader's stats */
 void
-InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
+InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage)
 {
 	BufferUsageAdd(&pgBufferUsage, bufusage);
 	WalUsageAdd(&pgWalUsage, walusage);
+	if (netusage != NULL)
+		NetUsageAdd(&pgNetUsage, netusage);
 }
 
 /* dst += add */
@@ -277,3 +301,20 @@ WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
 	dst->wal_records += add->wal_records - sub->wal_records;
 	dst->wal_fpi += add->wal_fpi - sub->wal_fpi;
 }
+
+void
+NetUsageAccumDiff(NetworkUsage * dst, const NetworkUsage * add,
+				  const NetworkUsage * sub)
+{
+	dst->fdw_recv_bytes += add->fdw_recv_bytes - sub->fdw_recv_bytes;
+	dst->fdw_sent_bytes += add->fdw_sent_bytes - sub->fdw_sent_bytes;
+	INSTR_TIME_ACCUM_DIFF(dst->fdw_wait_time, add->fdw_wait_time, sub->fdw_wait_time);
+}
+
+static void
+NetUsageAdd(NetworkUsage * dst, const NetworkUsage * add)
+{
+	dst->fdw_recv_bytes += add->fdw_recv_bytes;
+	dst->fdw_sent_bytes += add->fdw_sent_bytes;
+	INSTR_TIME_ADD(dst->fdw_wait_time, add->fdw_wait_time);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 467b0fd6fe7..75d4f574951 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1552,7 +1552,15 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
-
+	{
+		{"track_fdw_wait_timing", PGC_SUSET, STATS_COLLECTOR,
+			gettext_noop("Collects statistics for foreign source waiting time."),
+			NULL
+		},
+		&track_fdw_wait_timing,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"update_process_title", PGC_SUSET, PROCESS_TITLE,
 			gettext_noop("Updates the process title to show the active SQL command."),
diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h
index e94d9e49cf6..a212d87f037 100644
--- a/src/include/commands/explain.h
+++ b/src/include/commands/explain.h
@@ -43,6 +43,7 @@ typedef struct ExplainState
 	bool		costs;			/* print estimated costs */
 	bool		buffers;		/* print buffer usage */
 	bool		wal;			/* print WAL usage */
+	bool		network;		/* print network usage */
 	bool		timing;			/* print detailed node timing */
 	bool		summary;		/* print total planning and execution timing */
 	bool		settings;		/* print modified settings */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 3888175a2f4..22118628a51 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo
 	ParallelContext *pcxt;		/* parallel context we're using */
 	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
 	WalUsage   *wal_usage;		/* walusage area in DSM */
+	NetworkUsage *net_usage;    /* netusage area in DSM */
 	SharedExecutorInstrumentation *instrumentation; /* optional */
 	struct SharedJitInstrumentation *jit_instrumentation;	/* optional */
 	dsa_area   *area;			/* points to DSA area in DSM */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 2f9905b7c8e..29c8a7c6c39 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -51,6 +51,13 @@ typedef struct WalUsage
 	uint64		wal_bytes;		/* size of WAL records produced */
 } WalUsage;
 
+typedef struct NetworkUsage
+{
+	uint64		fdw_recv_bytes;		/* Bytes received from foreign source */
+	uint64		fdw_sent_bytes;		/* Bytes sent to foreign targets */
+	instr_time  fdw_wait_time; /* Time spent waiting for response from foreign source. */
+}			NetworkUsage;
+
 /* Flag bits included in InstrAlloc's instrument_options bitmask */
 typedef enum InstrumentOption
 {
@@ -58,6 +65,7 @@ typedef enum InstrumentOption
 	INSTRUMENT_BUFFERS = 1 << 1,	/* needs buffer usage */
 	INSTRUMENT_ROWS = 1 << 2,	/* needs row count */
 	INSTRUMENT_WAL = 1 << 3,	/* needs WAL usage */
+	INSTRUMENT_NETWORK = 1 << 4,	/* needs network usage */
 	INSTRUMENT_ALL = PG_INT32_MAX
 } InstrumentOption;
 
@@ -67,6 +75,7 @@ typedef struct Instrumentation
 	bool		need_timer;		/* true if we need timer data */
 	bool		need_bufusage;	/* true if we need buffer usage data */
 	bool		need_walusage;	/* true if we need WAL usage data */
+	bool		need_netusage;	/* true if we need network usage data */
 	bool		async_mode;		/* true if node is in async mode */
 	/* Info about current plan cycle: */
 	bool		running;		/* true if we've completed first tuple */
@@ -76,6 +85,7 @@ typedef struct Instrumentation
 	double		tuplecount;		/* # of tuples emitted so far this cycle */
 	BufferUsage bufusage_start; /* buffer usage at start */
 	WalUsage	walusage_start; /* WAL usage at start */
+	NetworkUsage netusage_start;	/* network usage at start */
 	/* Accumulated statistics across all completed cycles: */
 	double		startup;		/* total startup time (in seconds) */
 	double		total;			/* total time (in seconds) */
@@ -86,6 +96,7 @@ typedef struct Instrumentation
 	double		nfiltered2;		/* # of tuples removed by "other" quals */
 	BufferUsage bufusage;		/* total buffer usage */
 	WalUsage	walusage;		/* total WAL usage */
+	NetworkUsage netusage;		/* total network usage */
 } Instrumentation;
 
 typedef struct WorkerInstrumentation
@@ -96,6 +107,9 @@ typedef struct WorkerInstrumentation
 
 extern PGDLLIMPORT BufferUsage pgBufferUsage;
 extern PGDLLIMPORT WalUsage pgWalUsage;
+extern PGDLLIMPORT NetworkUsage pgNetUsage;
+
+extern PGDLLIMPORT bool track_fdw_wait_timing;
 
 extern Instrumentation *InstrAlloc(int n, int instrument_options,
 								   bool async_mode);
@@ -106,11 +120,12 @@ extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
 extern void InstrEndLoop(Instrumentation *instr);
 extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
 extern void InstrStartParallelQuery(void);
-extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
-extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
-extern void BufferUsageAccumDiff(BufferUsage *dst,
-								 const BufferUsage *add, const BufferUsage *sub);
+extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage);
+extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage);
+extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add,
+								 const BufferUsage *sub);
 extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,
 							  const WalUsage *sub);
-
+extern void NetUsageAccumDiff(NetworkUsage *dst, const NetworkUsage *add,
+							  const NetworkUsage *sub);
 #endif							/* INSTRUMENT_H */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc883709..6663b28eb8d 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQnetworkStats_hook       187
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 9a2a9702934..067c1aa51d7 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,6 +53,8 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+PQnetworkStats_hook_type PQnetworkStats_hook = NULL;
+
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
 static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c
index b15d8d137ce..c3653482e5e 100644
--- a/src/interfaces/libpq/fe-secure.c
+++ b/src/interfaces/libpq/fe-secure.c
@@ -224,6 +224,8 @@ pqsecure_read(PGconn *conn, void *ptr, size_t len)
 	{
 		n = pqsecure_raw_read(conn, ptr, len);
 	}
+	if (PQnetworkStats_hook)
+		PQnetworkStats_hook(0, n);
 
 	return n;
 }
@@ -307,6 +309,8 @@ pqsecure_write(PGconn *conn, const void *ptr, size_t len)
 		n = pqsecure_raw_write(conn, ptr, len);
 	}
 
+	if (PQnetworkStats_hook)
+		PQnetworkStats_hook(n, 0);
 	return n;
 }
 
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index a6fd69acebc..552a61b3c27 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -27,7 +27,7 @@ extern "C"
  * such as Oid.
  */
 #include "postgres_ext.h"
-
+#include "c.h"
 /*
  * These symbols may be used in compile-time #ifdef tests for the availability
  * of newer libpq features.
@@ -646,6 +646,9 @@ extern int	PQdsplen(const char *s, int encoding);
 /* Get encoding id from environment variable PGCLIENTENCODING */
 extern int	PQenv2encoding(void);
 
+typedef void (*PQnetworkStats_hook_type) (ssize_t bytesSent, ssize_t bytesReceived);
+extern PGDLLEXPORT PQnetworkStats_hook_type PQnetworkStats_hook;
+
 /* === in fe-auth.c === */
 
 extern char *PQencryptPassword(const char *passwd, const char *user);
-- 
2.30.2

Reply via email to