I enabled the test again and also pushed the changes to dblink,
isolationtester and fe_utils (which AFAICS is used by pg_dump,
pg_amcheck, reindexdb and vacuumdb).  I chickened out of committing the
postgres_fdw changes though, so here they are again.  Not sure I'll find
courage to get these done by tomorrow, or whether I should just let them
for Fujita-san or Noah, who have been the last committers to touch this.


-- 
Álvaro Herrera               48°01'N 7°57'E  —  https://www.EnterpriseDB.com/
"No renuncies a nada. No te aferres a nada."
>From 737578fcdc6ed0de64838d2ad905054b18eb9ec1 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fenn...@microsoft.com>
Date: Mon, 18 Mar 2024 19:37:40 +0100
Subject: [PATCH v39] postgres_fdw: Start using new libpq cancel APIs

Commit 61461a300c1c introduced new functions to libpq for cancelling
queries.  This replaces the usage of the old ones in postgres_fdw.

Author: Jelte Fennema-Nio <postg...@jeltef.nl>
Discussion: https://postgr.es/m/cageczqt_vgowwenuqvuv9xqmbacyxjtrrayo8w07oqashk_...@mail.gmail.com
---
 contrib/postgres_fdw/connection.c             | 108 +++++++++++++++---
 .../postgres_fdw/expected/postgres_fdw.out    |  15 +++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   7 ++
 3 files changed, 113 insertions(+), 17 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf591..0c66eaa001 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
 								   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,36 +1315,106 @@ pgfdw_cancel_query(PGconn *conn)
 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
 										  CONNECTION_CLEANUP_TIMEOUT);
 
-	if (!pgfdw_cancel_query_begin(conn))
+	if (!pgfdw_cancel_query_begin(conn, endtime))
 		return false;
 	return pgfdw_cancel_query_end(conn, endtime, false);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return true; if the timeout
+ * lapses without that, or the request fails for whatever reason, return
+ * false.
+ */
 static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
+	bool		timed_out = false;
+	bool		failed = false;
+	PGcancelConn *cancel_conn = PQcancelCreate(conn);
 
-	/*
-	 * Issue cancel request.  Unfortunately, there's no good way to limit the
-	 * amount of time that we might block inside PQgetCancel().
-	 */
-	if ((cancel = PQgetCancel(conn)))
+	if (!PQcancelStart(cancel_conn))
 	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+		PG_TRY();
 		{
 			ereport(WARNING,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
-			return false;
+							pchomp(PQcancelErrorMessage(cancel_conn)))));
 		}
-		PQfreeCancel(cancel);
+		PG_FINALLY();
+		{
+			PQcancelFinish(cancel_conn);
+		}
+		PG_END_TRY();
+		return false;
 	}
 
-	return true;
+	/* In what follows, do not leak any PGcancelConn on an error. */
+	PG_TRY();
+	{
+		while (true)
+		{
+			PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn);
+			TimestampTz now = GetCurrentTimestamp();
+			long		cur_timeout;
+			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+			if (pollres == PGRES_POLLING_OK)
+				break;
+
+			/* If timeout has expired, give up, else get sleep time. */
+			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+			if (cur_timeout <= 0)
+			{
+				timed_out = true;
+				failed = true;
+				goto exit;
+			}
+
+			switch (pollres)
+			{
+				case PGRES_POLLING_READING:
+					waitEvents |= WL_SOCKET_READABLE;
+					break;
+				case PGRES_POLLING_WRITING:
+					waitEvents |= WL_SOCKET_WRITEABLE;
+					break;
+				default:
+					failed = true;
+					goto exit;
+			}
+
+			/* Sleep until there's something to do */
+			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+							  cur_timeout, PG_WAIT_EXTENSION);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+exit:
+		if (failed)
+		{
+			if (timed_out)
+				ereport(WARNING,
+						(errmsg("could not cancel request due to timeout")));
+			else
+				ereport(WARNING,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("could not send cancel request: %s",
+								pchomp(PQcancelErrorMessage(cancel_conn)))));
+		}
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancel_conn);
+	}
+	PG_END_TRY();
+
+	return !failed;
 }
 
 static bool
@@ -1685,7 +1755,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
 	 */
 	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
 	{
-		if (!pgfdw_cancel_query_begin(entry->conn))
+		TimestampTz endtime;
+
+		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+											  CONNECTION_CLEANUP_TIMEOUT);
+		if (!pgfdw_cancel_query_begin(entry->conn, endtime))
 			return false;		/* Unable to cancel running query */
 		*cancel_requested = lappend(*cancel_requested, entry);
 	}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 58a603ac56..e03160bd97 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e3d147de6d..2626e68cc6 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
-- 
2.39.2

Reply via email to