I enabled the test again and also pushed the changes to dblink, isolationtester and fe_utils (which AFAICS is used by pg_dump, pg_amcheck, reindexdb and vacuumdb). I chickened out of committing the postgres_fdw changes though, so here they are again. Not sure I'll find courage to get these done by tomorrow, or whether I should just let them for Fujita-san or Noah, who have been the last committers to touch this.
-- Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/ "No renuncies a nada. No te aferres a nada."
>From 737578fcdc6ed0de64838d2ad905054b18eb9ec1 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio <jelte.fenn...@microsoft.com> Date: Mon, 18 Mar 2024 19:37:40 +0100 Subject: [PATCH v39] postgres_fdw: Start using new libpq cancel APIs Commit 61461a300c1c introduced new functions to libpq for cancelling queries. This replaces the usage of the old ones in postgres_fdw. Author: Jelte Fennema-Nio <postg...@jeltef.nl> Discussion: https://postgr.es/m/cageczqt_vgowwenuqvuv9xqmbacyxjtrrayo8w07oqashk_...@mail.gmail.com --- contrib/postgres_fdw/connection.c | 108 +++++++++++++++--- .../postgres_fdw/expected/postgres_fdw.out | 15 +++ contrib/postgres_fdw/sql/postgres_fdw.sql | 7 ++ 3 files changed, 113 insertions(+), 17 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4931ebf591..0c66eaa001 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); -static bool pgfdw_cancel_query_begin(PGconn *conn); +static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime); static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, @@ -1315,36 +1315,106 @@ pgfdw_cancel_query(PGconn *conn) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), CONNECTION_CLEANUP_TIMEOUT); - if (!pgfdw_cancel_query_begin(conn)) + if (!pgfdw_cancel_query_begin(conn, endtime)) return false; return pgfdw_cancel_query_end(conn, endtime, false); } +/* + * Submit a cancel request to the given connection, waiting only until + * the given time. + * + * We sleep interruptibly until we receive confirmation that the cancel + * request has been accepted, and if it is, return true; if the timeout + * lapses without that, or the request fails for whatever reason, return + * false. + */ static bool -pgfdw_cancel_query_begin(PGconn *conn) +pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime) { - PGcancel *cancel; - char errbuf[256]; + bool timed_out = false; + bool failed = false; + PGcancelConn *cancel_conn = PQcancelCreate(conn); - /* - * Issue cancel request. Unfortunately, there's no good way to limit the - * amount of time that we might block inside PQgetCancel(). - */ - if ((cancel = PQgetCancel(conn))) + if (!PQcancelStart(cancel_conn)) { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + PG_TRY(); { ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - return false; + pchomp(PQcancelErrorMessage(cancel_conn))))); } - PQfreeCancel(cancel); + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + return false; } - return true; + /* In what follows, do not leak any PGcancelConn on an error. */ + PG_TRY(); + { + while (true) + { + PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn); + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; + int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + if (pollres == PGRES_POLLING_OK) + break; + + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + timed_out = true; + failed = true; + goto exit; + } + + switch (pollres) + { + case PGRES_POLLING_READING: + waitEvents |= WL_SOCKET_READABLE; + break; + case PGRES_POLLING_WRITING: + waitEvents |= WL_SOCKET_WRITEABLE; + break; + default: + failed = true; + goto exit; + } + + /* Sleep until there's something to do */ + WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } +exit: + if (failed) + { + if (timed_out) + ereport(WARNING, + (errmsg("could not cancel request due to timeout"))); + else + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + pchomp(PQcancelErrorMessage(cancel_conn))))); + } + } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + + return !failed; } static bool @@ -1685,7 +1755,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, */ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) { - if (!pgfdw_cancel_query_begin(entry->conn)) + TimestampTz endtime; + + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + if (!pgfdw_cancel_query_begin(entry->conn, endtime)) return false; /* Unable to cancel running query */ *cancel_requested = lappend(*cancel_requested, entry); } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 58a603ac56..e03160bd97 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c (10 rows) ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)) + Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE)) +(4 rows) + +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +ERROR: canceling statement due to statement timeout +RESET statement_timeout; -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e3d147de6d..2626e68cc6 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +RESET statement_timeout; + -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL -- 2.39.2