Hello,
I have implemented per query network stat collection for FDW. It is done
in a similar way to how buffer and WAL stats are collected and it can be
seen with a new NETWORK option for explain command:
explain (analyze, network) insert into itrtest values (2, 'blah');
QUERY PLAN
---
Insert on itrtest (cost=0.00..0.01 rows=0 width=0) (actual
time=0.544..0.544 rows=0 loops=1)
Network: FDW bytes sent=197 received=72, wait_time=0.689
-> Result (cost=0.00..0.01 rows=1 width=36) (actual
time=0.003..0.003 rows=1 loops=1)
Planning Time: 0.025 ms
Execution Time: 0.701 ms
(5 rows)
I am yet to add corresponding columns to pg_stat_statements, write tests
and documentation, but before I go ahead with that, I would like to know
what the community thinks about the patch.
Regards,
Ilya Gladyshev
>From 3ffbe071480672189c2e03d7e54707c77ba58b0b Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev
Date: Mon, 23 Aug 2021 21:37:31 +0300
Subject: [PATCH] adds per query FDW network usage stats
Adds means for collecting network usage stats and outputting it in
explain with NETWORK option. Implements network stats collection for
postgres_fdw via adding a hook for stat collection to libpq.
---
contrib/postgres_fdw/connection.c| 57 -
contrib/postgres_fdw/postgres_fdw.c | 28 +
src/backend/access/heap/vacuumlazy.c | 5 ++-
src/backend/access/nbtree/nbtsort.c | 14 ---
src/backend/commands/explain.c | 62
src/backend/executor/execParallel.c | 28 ++---
src/backend/executor/instrument.c| 57 +
src/backend/utils/misc/guc.c | 10 -
src/include/commands/explain.h | 1 +
src/include/executor/execParallel.h | 1 +
src/include/executor/instrument.h| 25 ---
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-misc.c | 2 +
src/interfaces/libpq/fe-secure.c | 4 ++
src/interfaces/libpq/libpq-fe.h | 5 ++-
15 files changed, 271 insertions(+), 29 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 82aa14a65de..3f479a74ba1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -571,10 +571,22 @@ void
do_sql_command(PGconn *conn, const char *sql)
{
PGresult *res;
+ instr_time start, duration;
+
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -684,10 +696,14 @@ GetPrepStmtNumber(PGconn *conn)
PGresult *
pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
{
+ PGresult *res;
+ instr_time start, duration;
/* First, process a pending asynchronous request, if any. */
if (state && state->pendingAreq)
process_pending_request(state->pendingAreq);
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -696,7 +712,14 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
pgfdw_report_error(ERROR, NULL, conn, false, query);
/* Wait for the result. */
- return pgfdw_get_result(conn, query);
+ res = pgfdw_get_result(conn, query);
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
+ return res;
}
/*
@@ -717,6 +740,10 @@ pgfdw_get_result(PGconn *conn, const char *query)
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
for (;;)
{
PGresult *res;
@@ -750,6 +777,13 @@ pgfdw_get_result(PGconn *conn, const char *query)
PQclear(last_res);
last_res = res;
}
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
}
PG_CATCH();
{
@@ -893,7 +927,18 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (entry->have_prep_stmt && entry->have_error)
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
res = PQexec(entry->conn, "DEALLOCATE ALL");
+
+ if (track_fdw_wait_timing)
+ {
+