On Sat Mar 7, 2026 at 1:01 AM CET, Jelte Fennema-Nio wrote:
I took a look at this, and attached a fixup patch that does this.

Now with the actual attachements...
From 0e37f6acca869ea471a840f9511adf1de856b5d2 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sat, 13 Dec 2025 13:05:50 +0100
Subject: [PATCH v0] POC: Use non-blocking apis for frontend tools

POC by Claude Code, to see what would be needed to change all
fe_utils/cancel.c users to use non-blocking APIs. This code is an
extremely rough draft, only meant as a way to gauge feasibility of this
approach.
---
 src/bin/pgbench/pgbench.c     |  38 +-
 src/bin/psql/command.c        |   5 +-
 src/bin/psql/common.c         |  47 +-
 src/bin/psql/copy.c           |  54 +-
 src/bin/psql/large_obj.c      |  12 +-
 src/fe_utils/Makefile         |   2 +-
 src/fe_utils/cancel.c         | 914 ++++++++++++++++++++++++++++++----
 src/fe_utils/parallel_slot.c  |  45 +-
 src/fe_utils/query_utils.c    |   8 +-
 src/include/fe_utils/cancel.h |  34 +-
 10 files changed, 953 insertions(+), 206 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc09..8ebefceeebf 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -867,7 +867,7 @@ get_table_relkind(PGconn *con, const char *table)
 	const char *sql =
 		"SELECT relkind FROM pg_catalog.pg_class WHERE oid=$1::pg_catalog.regclass";
 
-	res = PQexecParams(con, sql, 1, NULL, params, NULL, NULL, 0);
+	res = cancellable_exec_params(con, sql, 1, NULL, params, NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		pg_log_error("query failed: %s", PQerrorMessage(con));
@@ -1490,13 +1490,13 @@ accumStats(StatsData *stats, bool skipped, double lat, double lag,
 	}
 }
 
-/* call PQexec() and exit() on failure */
+/* call cancellable_exec() and exit() on failure */
 static void
 executeStatement(PGconn *con, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(con, sql);
+	res = cancellable_exec(con, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		pg_log_error("query failed: %s", PQerrorMessage(con));
@@ -1506,13 +1506,13 @@ executeStatement(PGconn *con, const char *sql)
 	PQclear(res);
 }
 
-/* call PQexec() and complain, but without exiting, on failure */
+/* call cancellable_exec() and complain, but without exiting, on failure */
 static void
 tryExecuteStatement(PGconn *con, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(con, sql);
+	res = cancellable_exec(con, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		pg_log_error("%s", PQerrorMessage(con));
@@ -5059,7 +5059,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	else if (n == -1)
 		pg_fatal("invalid format string");
 
-	res = PQexec(con, copy_statement);
+	res = cancellable_exec(con, copy_statement);
 
 	if (PQresultStatus(res) != PGRES_COPY_IN)
 		pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
@@ -5338,7 +5338,6 @@ runInitSteps(const char *initialize_steps)
 		pg_fatal("could not create connection for initialization");
 
 	setup_cancel_handler(NULL);
-	SetCancelConn(con);
 
 	for (step = initialize_steps; *step != '\0'; step++)
 	{
@@ -5399,7 +5398,6 @@ runInitSteps(const char *initialize_steps)
 	}
 
 	fprintf(stderr, "done in %.2f s (%s).\n", run_time, stats.data);
-	ResetCancelConn();
 	PQfinish(con);
 	termPQExpBuffer(&stats);
 }
@@ -5417,7 +5415,7 @@ GetTableInfo(PGconn *con, bool scale_given)
 	 * get the scaling factor that should be same as count(*) from
 	 * pgbench_branches if this is not a custom query
 	 */
-	res = PQexec(con, "select count(*) from pgbench_branches");
+	res = cancellable_exec(con, "select count(*) from pgbench_branches");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		char	   *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
@@ -5456,17 +5454,17 @@ GetTableInfo(PGconn *con, bool scale_given)
 	 * We assume no partitioning on any failure, so as to avoid failing on an
 	 * old version without "pg_partitioned_table".
 	 */
-	res = PQexec(con,
-				 "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
-				 "from pg_catalog.pg_class as c "
-				 "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
-				 "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
-				 "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
-				 "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
-				 "where c.relname = 'pgbench_accounts' and o.n is not null "
-				 "group by 1, 2 "
-				 "order by 1 asc "
-				 "limit 1");
+	res = cancellable_exec(con,
+						   "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
+						   "from pg_catalog.pg_class as c "
+						   "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
+						   "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
+						   "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
+						   "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
+						   "where c.relname = 'pgbench_accounts' and o.n is not null "
+						   "group by 1, 2 "
+						   "order by 1 asc "
+						   "limit 1");
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index e6365d823ce..238d62dc86c 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -4299,7 +4299,6 @@ do_connect(enum trivalue reuse_previous_specification,
 				 */
 				PQfinish(o_conn);
 				pset.db = NULL;
-				ResetCancelConn();
 				UnsyncVariables();
 			}
 
@@ -6225,7 +6224,7 @@ lookup_object_oid(EditableObjectType obj_type, const char *desc,
 		destroyPQExpBuffer(query);
 		return false;
 	}
-	res = PQexec(pset.db, query->data);
+	res = cancellable_exec(pset.db, query->data);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
 		*obj_oid = atooid(PQgetvalue(res, 0, 0));
 	else
@@ -6307,7 +6306,7 @@ get_create_object_cmd(EditableObjectType obj_type, Oid oid,
 		destroyPQExpBuffer(query);
 		return false;
 	}
-	res = PQexec(pset.db, query->data);
+	res = cancellable_exec(pset.db, query->data);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
 	{
 		resetPQExpBuffer(buf);
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 2eadd391a9c..119ca99b418 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -382,7 +382,6 @@ CheckConnection(void)
 				PQfinish(pset.dead_conn);
 			pset.dead_conn = pset.db;
 			pset.db = NULL;
-			ResetCancelConn();
 			UnsyncVariables();
 		}
 		else
@@ -583,7 +582,7 @@ ClearOrSaveAllResults(void)
 {
 	PGresult   *result;
 
-	while ((result = PQgetResult(pset.db)) != NULL)
+	while ((result = cancellable_getresult(pset.db)) != NULL)
 		ClearOrSaveResult(result);
 }
 
@@ -681,11 +680,7 @@ PSQLexec(const char *query)
 			return NULL;
 	}
 
-	SetCancelConn(pset.db);
-
-	res = PQexec(pset.db, query);
-
-	ResetCancelConn();
+	res = cancellable_exec(pset.db, query);
 
 	if (!AcceptResult(res, true))
 	{
@@ -719,12 +714,8 @@ PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout,
 		return 0;
 	}
 
-	SetCancelConn(pset.db);
-
 	res = ExecQueryAndProcessResults(query, &elapsed_msec, NULL, true, min_rows, opt, printQueryFout);
 
-	ResetCancelConn();
-
 	/* Possible microtiming output */
 	if (timing)
 		PrintTiming(elapsed_msec);
@@ -943,8 +934,6 @@ HandleCopyResult(PGresult **resultp, FILE *copystream)
 	Assert(result_status == PGRES_COPY_OUT ||
 		   result_status == PGRES_COPY_IN);
 
-	SetCancelConn(pset.db);
-
 	if (result_status == PGRES_COPY_OUT)
 	{
 		success = handleCopyOut(pset.db,
@@ -973,7 +962,6 @@ HandleCopyResult(PGresult **resultp, FILE *copystream)
 							   PQbinaryTuples(*resultp),
 							   &copy_result);
 	}
-	ResetCancelConn();
 
 	/*
 	 * Replace the PGRES_COPY_OUT/IN result with COPY command's exit status,
@@ -1162,8 +1150,6 @@ SendQuery(const char *query)
 		fflush(pset.logfile);
 	}
 
-	SetCancelConn(pset.db);
-
 	transaction_status = PQtransactionStatus(pset.db);
 
 	if (transaction_status == PQTRANS_IDLE &&
@@ -1172,7 +1158,7 @@ SendQuery(const char *query)
 	{
 		PGresult   *result;
 
-		result = PQexec(pset.db, "BEGIN");
+		result = cancellable_exec(pset.db, "BEGIN");
 		if (PQresultStatus(result) != PGRES_COMMAND_OK)
 		{
 			pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1190,7 +1176,7 @@ SendQuery(const char *query)
 	{
 		PGresult   *result;
 
-		result = PQexec(pset.db, "SAVEPOINT pg_psql_temporary_savepoint");
+		result = cancellable_exec(pset.db, "SAVEPOINT pg_psql_temporary_savepoint");
 		if (PQresultStatus(result) != PGRES_COMMAND_OK)
 		{
 			pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1258,7 +1244,7 @@ SendQuery(const char *query)
 		{
 			PGresult   *svptres;
 
-			svptres = PQexec(pset.db, svptcmd);
+			svptres = cancellable_exec(pset.db, svptcmd);
 			if (PQresultStatus(svptres) != PGRES_COMMAND_OK)
 			{
 				pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1293,9 +1279,6 @@ SendQuery(const char *query)
 
 sendquery_cleanup:
 
-	/* global cancellation reset */
-	ResetCancelConn();
-
 	/* reset \g's output-to-filename trigger */
 	if (pset.gfname)
 	{
@@ -1370,7 +1353,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
 	 * anyway.  (So there's no great need to clear it when done, which is a
 	 * good thing because libpq provides no easy way to do that.)
 	 */
-	result = PQprepare(pset.db, "", query, 0, NULL);
+	result = cancellable_prepare(pset.db, "", query, 0, NULL);
 	if (PQresultStatus(result) != PGRES_COMMAND_OK)
 	{
 		pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1380,7 +1363,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
 	}
 	PQclear(result);
 
-	result = PQdescribePrepared(pset.db, "");
+	result = cancellable_describe_prepared(pset.db, "");
 	OK = AcceptResult(result, true) &&
 		(PQresultStatus(result) == PGRES_COMMAND_OK);
 	if (OK && result)
@@ -1428,7 +1411,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
 			appendPQExpBufferStr(&buf, ") s(name, tp, tpm)");
 			PQclear(result);
 
-			result = PQexec(pset.db, buf.data);
+			result = cancellable_exec(pset.db, buf.data);
 			OK = AcceptResult(result, true);
 
 			if (timing)
@@ -1467,7 +1450,7 @@ discardAbortedPipelineResults(void)
 {
 	for (;;)
 	{
-		PGresult   *res = PQgetResult(pset.db);
+		PGresult   *res = cancellable_getresult(pset.db);
 		ExecStatusType result_status = PQresultStatus(res);
 
 		if (result_status == PGRES_PIPELINE_SYNC)
@@ -1491,7 +1474,7 @@ discardAbortedPipelineResults(void)
 			 * Fetch result to consume the end of the current query being
 			 * processed.
 			 */
-			fatal_res = PQgetResult(pset.db);
+			fatal_res = cancellable_getresult(pset.db);
 			Assert(fatal_res == NULL);
 			return res;
 		}
@@ -1759,8 +1742,8 @@ ExecQueryAndProcessResults(const char *query,
 		return 0;
 	}
 
-	/* first result */
-	result = PQgetResult(pset.db);
+	/* first result -- use cancellable wait for the potentially long wait */
+	result = cancellable_getresult(pset.db);
 	if (min_rows > 0 && PQntuples(result) < min_rows)
 	{
 		return_early = true;
@@ -1828,7 +1811,7 @@ ExecQueryAndProcessResults(const char *query,
 				result = discardAbortedPipelineResults();
 			}
 			else
-				result = PQgetResult(pset.db);
+				result = cancellable_getresult(pset.db);
 
 			/*
 			 * Get current timing measure in case an error occurs
@@ -1987,7 +1970,7 @@ ExecQueryAndProcessResults(const char *query,
 				ClearOrSaveResult(result);
 
 				/* get the next result, loop if it's PGRES_TUPLES_CHUNK */
-				result = PQgetResult(pset.db);
+				result = cancellable_getresult(pset.db);
 			} while (PQresultStatus(result) == PGRES_TUPLES_CHUNK);
 
 			/* We expect an empty PGRES_TUPLES_OK, else there's a problem */
@@ -2081,7 +2064,7 @@ ExecQueryAndProcessResults(const char *query,
 		 * to process.  We need to do that to check whether this is the last.
 		 */
 		if (PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
-			next_result = PQgetResult(pset.db);
+			next_result = cancellable_getresult(pset.db);
 		else
 		{
 			/*
diff --git a/src/bin/psql/copy.c b/src/bin/psql/copy.c
index 6a8a9792e7d..fc28537acc1 100644
--- a/src/bin/psql/copy.c
+++ b/src/bin/psql/copy.c
@@ -18,6 +18,7 @@
 #include "common.h"
 #include "common/logging.h"
 #include "copy.h"
+#include "fe_utils/cancel.h"
 #include "libpq-fe.h"
 #include "pqexpbuffer.h"
 #include "prompt.h"
@@ -436,10 +437,24 @@ handleCopyOut(PGconn *conn, FILE *copystream, PGresult **res)
 	bool		OK = true;
 	char	   *buf;
 	int			ret;
+	bool		cancel_sent = false;
 
 	for (;;)
 	{
-		ret = PQgetCopyData(conn, &buf, 0);
+		/* Use async mode so we can watch for cancel interrupts */
+		ret = PQgetCopyData(conn, &buf, 1);
+
+		if (ret == 0)
+		{
+			/* No data available yet, wait for socket or cancel */
+			if (!cancellable_socket_wait(conn, &cancel_sent, false))
+			{
+				OK = false;
+				break;
+			}
+			PQconsumeInput(conn);
+			continue;
+		}
 
 		if (ret < 0)
 			break;				/* done or server/connection error */
@@ -480,7 +495,7 @@ handleCopyOut(PGconn *conn, FILE *copystream, PGresult **res)
 	 * but hasn't exited COPY_OUT state internally.  So we ignore the
 	 * possibility here.
 	 */
-	*res = PQgetResult(conn);
+	*res = cancellable_getresult(conn);
 	if (PQresultStatus(*res) != PGRES_COMMAND_OK)
 	{
 		pg_log_info("%s", PQerrorMessage(conn));
@@ -513,6 +528,10 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 	bool		OK;
 	char		buf[COPYBUFSIZ];
 	bool		showprompt;
+	bool		cancel_sent = false;
+
+	/* Set non-blocking mode so PQputCopyData/End won't block internally */
+	PQsetnonblocking(conn, 1);
 
 	/*
 	 * Establish longjmp destination for exiting from wait-for-input. (This is
@@ -523,9 +542,10 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 		/* got here with longjmp */
 
 		/* Terminate data transfer */
-		PQputCopyEnd(conn,
-					 (PQprotocolVersion(conn) < 3) ? NULL :
-					 _("canceled by user"));
+		cancellable_put_copy_end(conn,
+								 (PQprotocolVersion(conn) < 3) ? NULL :
+								 _("canceled by user"),
+								 &cancel_sent);
 
 		OK = false;
 		goto copyin_cleanup;
@@ -569,7 +589,7 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 			if (buflen <= 0)
 				break;
 
-			if (PQputCopyData(conn, buf, buflen) <= 0)
+			if (cancellable_put_copy_data(conn, buf, buflen, &cancel_sent) <= 0)
 			{
 				OK = false;
 				break;
@@ -667,7 +687,7 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 			 */
 			if (buflen >= COPYBUFSIZ - 5 || (copydone && buflen > 0))
 			{
-				if (PQputCopyData(conn, buf, buflen) <= 0)
+				if (cancellable_put_copy_data(conn, buf, buflen, &cancel_sent) <= 0)
 				{
 					OK = false;
 					break;
@@ -688,9 +708,10 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 	 * keep the version checks just in case you're using a pre-v14 libpq.so at
 	 * runtime)
 	 */
-	if (PQputCopyEnd(conn,
-					 (OK || PQprotocolVersion(conn) < 3) ? NULL :
-					 _("aborted because of read failure")) <= 0)
+	if (cancellable_put_copy_end(conn,
+								 (OK || PQprotocolVersion(conn) < 3) ? NULL :
+								 _("aborted because of read failure"),
+								 &cancel_sent) <= 0)
 		OK = false;
 
 copyin_cleanup:
@@ -717,15 +738,20 @@ copyin_cleanup:
 	 * connection is lost.  But that's fine; it will get us out of COPY_IN
 	 * state, which is what we need.)
 	 */
-	while (*res = PQgetResult(conn), PQresultStatus(*res) == PGRES_COPY_IN)
+	while (*res = cancellable_getresult(conn), PQresultStatus(*res) == PGRES_COPY_IN)
 	{
 		OK = false;
 		PQclear(*res);
 		/* We can't send an error message if we're using protocol version 2 */
-		PQputCopyEnd(conn,
-					 (PQprotocolVersion(conn) < 3) ? NULL :
-					 _("trying to exit copy mode"));
+		cancellable_put_copy_end(conn,
+								 (PQprotocolVersion(conn) < 3) ? NULL :
+								 _("trying to exit copy mode"),
+								 &cancel_sent);
 	}
+
+	/* Restore blocking mode */
+	PQsetnonblocking(conn, 0);
+
 	if (PQresultStatus(*res) != PGRES_COMMAND_OK)
 	{
 		pg_log_info("%s", PQerrorMessage(conn));
diff --git a/src/bin/psql/large_obj.c b/src/bin/psql/large_obj.c
index 021f78e0f78..949f984ff58 100644
--- a/src/bin/psql/large_obj.c
+++ b/src/bin/psql/large_obj.c
@@ -147,9 +147,7 @@ do_lo_export(const char *loid_arg, const char *filename_arg)
 	if (!start_lo_xact("\\lo_export", &own_transaction))
 		return false;
 
-	SetCancelConn(NULL);
-	status = lo_export(pset.db, atooid(loid_arg), filename_arg);
-	ResetCancelConn();
+	status = cancellable_lo_export(pset.db, atooid(loid_arg), filename_arg);
 
 	/* of course this status is documented nowhere :( */
 	if (status != 1)
@@ -183,9 +181,7 @@ do_lo_import(const char *filename_arg, const char *comment_arg)
 	if (!start_lo_xact("\\lo_import", &own_transaction))
 		return false;
 
-	SetCancelConn(NULL);
-	loid = lo_import(pset.db, filename_arg);
-	ResetCancelConn();
+	loid = cancellable_lo_import(pset.db, filename_arg);
 
 	if (loid == InvalidOid)
 	{
@@ -245,9 +241,7 @@ do_lo_unlink(const char *loid_arg)
 	if (!start_lo_xact("\\lo_unlink", &own_transaction))
 		return false;
 
-	SetCancelConn(NULL);
-	status = lo_unlink(pset.db, loid);
-	ResetCancelConn();
+	status = cancellable_lo_unlink(pset.db, loid);
 
 	if (status == -1)
 	{
diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile
index cbfbf93ac69..809ab21cc0c 100644
--- a/src/fe_utils/Makefile
+++ b/src/fe_utils/Makefile
@@ -17,7 +17,7 @@ subdir = src/fe_utils
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) -I$(top_srcdir)/src/port $(CPPFLAGS)
 
 OBJS = \
 	archive.o \
diff --git a/src/fe_utils/cancel.c b/src/fe_utils/cancel.c
index e6b75439f56..514a8f54c82 100644
--- a/src/fe_utils/cancel.c
+++ b/src/fe_utils/cancel.c
@@ -2,9 +2,47 @@
  *
  * Query cancellation support for frontend code
  *
- * Assorted utility functions to control query cancellation with signal
- * handler for SIGINT.
+ * This module provides SIGINT/Ctrl-C handling for frontend tools that need
+ * to cancel queries running on the server.  It combines three completely
+ * independent mechanisms, any combination of which can be used by a caller:
  *
+ * 1. Server cancel request -- Often what applications need. When a query is
+ *    running, and the main thread is waiting for the result of that query in a
+ *    blocking manner, we want SIGINT/Ctrl-C to cancel that query. This can be
+ *    done by waiting for the query using cancellable_getresult() or
+ *    cancellable_exec() instead of PQgetResult() or PQexec(). These functions
+ *    wait on both the server connection and a cancel interrupt simultaneously.
+ *    When SIGINT/Ctrl-C is received the cancel request is sent to the server
+ *    from the main thread, which avoids race conditions that would occur if
+ *    the cancel were sent from a different thread.
+ *
+ * 2. CancelRequested flag -- A more involved but also much more flexible way
+ *    of cancelling. A volatile sig_atomic_t CancelRequested flag is set to
+ *    true whenever SIGINT is received. This means that the application code
+ *    can fully control what it does with this flag. The primary usecase for
+ *    this is when the application code is not blocked (indefinitely), but
+ *    needs to take an action when Ctrl-C is pressed, such as break out of a
+ *    long running loop.
+ *
+ * 3. Cancel callback -- The most complex way of handling a sigint. An optional
+ *    function pointer registered via setup_cancel_handler().  If set, it is
+ *    called directly from the signal handler, so it must be async-signal-safe.
+ *    Writing async-signal-safe code is not easy, so this is only recommended
+ *    as a last resort. psql uses this to longjmp back to the main loop when no
+ *    query is active.
+ *
+ * On Unix, the SIGINT signal handler cannot call PQcancelBlocking() directly
+ * because it is not async-signal-safe.  Instead, we use a pipe (the "self-pipe
+ * trick") to interrupt the main thread's select()/poll() call.  The signal
+ * handler writes a byte to the pipe, which makes the main thread's wait
+ * return.  The main thread then notices the cancel request and sends it to the
+ * server synchronously.
+ *
+ * On Windows, the console control handler runs in a separate OS-provided
+ * thread.  We use a Windows event object as the equivalent of the self-pipe:
+ * the console handler signals the event, and the main thread uses
+ * WaitForMultipleObjects to wait on both the socket and the cancel event
+ * simultaneously, so cancellation is noticed instantly.
  *
  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -16,11 +54,18 @@
 
 #include "postgres_fe.h"
 
+#include <fcntl.h>
+#include <signal.h>
 #include <unistd.h>
 
-#include "common/connect.h"
+#ifndef WIN32
+#include <sys/select.h>
+#endif
+
+#include "common/logging.h"
 #include "fe_utils/cancel.h"
 #include "fe_utils/string_utils.h"
+#include "libpq/libpq-fs.h"
 
 
 /*
@@ -36,11 +81,6 @@
 		(void) rc_; \
 	} while (0)
 
-/*
- * Contains all the information needed to cancel a query issued from
- * a database connection to the backend.
- */
-static PGcancel *volatile cancelConn = NULL;
 
 /*
  * Predetermined localized error strings --- needed to avoid trying
@@ -58,169 +98,770 @@ static const char *cancel_not_sent_msg = NULL;
  */
 volatile sig_atomic_t CancelRequested = false;
 
-#ifdef WIN32
-static CRITICAL_SECTION cancelConnLock;
-#endif
-
 /*
  * Additional callback for cancellations.
  */
 static void (*cancel_callback) (void) = NULL;
 
+#ifndef WIN32
+/*
+ * On Unix, the SIGINT signal handler cannot call PQcancelBlocking() directly
+ * because it is not async-signal-safe.  Instead, we use a pipe to interrupt
+ * the main thread: the signal handler writes a byte to the pipe, and the main
+ * thread's select() call returns because the pipe's read end becomes readable.
+ * The main thread then sends the cancel request to the server.
+ */
+static int	cancel_pipe[2] = {-1, -1};
+#else
+/*
+ * On Windows, we use an event object to wake the main thread's
+ * WaitForMultipleObjects() call when Ctrl-C is pressed.
+ */
+static HANDLE cancel_event = NULL;
+#endif
+
 
 /*
- * SetCancelConn
- *
- * Set cancelConn to point to the current database connection.
+ * Send a cancel request to the given connection.
  */
 void
-SetCancelConn(PGconn *conn)
+send_cancel(PGconn *conn)
 {
-	PGcancel   *oldCancelConn;
+	PGcancelConn *cancelConn = PQcancelCreate(conn);
 
-#ifdef WIN32
-	EnterCriticalSection(&cancelConnLock);
-#endif
+	if (cancel_sent_msg)
+		write_stderr(cancel_sent_msg);
 
-	/* Free the old one if we have one */
-	oldCancelConn = cancelConn;
+	if (!PQcancelBlocking(cancelConn))
+	{
+		if (cancel_not_sent_msg)
+			write_stderr(cancel_not_sent_msg);
+		write_stderr(PQcancelErrorMessage(cancelConn));
+	}
 
-	/* be sure handle_sigint doesn't use pointer while freeing */
-	cancelConn = NULL;
+	PQcancelFinish(cancelConn);
+}
 
-	if (oldCancelConn != NULL)
-		PQfreeCancel(oldCancelConn);
 
-	cancelConn = PQgetCancel(conn);
+#ifndef WIN32
 
-#ifdef WIN32
-	LeaveCriticalSection(&cancelConnLock);
-#endif
+/*
+ * Drain any pending bytes from the cancel pipe.  This should be called
+ * before entering a cancellable wait, so that a stale signal from a
+ * previous query doesn't immediately trigger a cancel.
+ */
+static void
+drain_cancel_pipe(void)
+{
+	if (cancel_pipe[0] >= 0)
+	{
+		char		buf[16];
+		int			save_errno = errno;
+
+		while (read(cancel_pipe[0], buf, sizeof(buf)) > 0)
+			 /* loop */ ;
+
+		errno = save_errno;
+	}
 }
 
+#endif							/* !WIN32 */
+
+
 /*
- * ResetCancelConn
+ * cancellable_socket_wait
+ *
+ * Wait for the given connection's socket to become readable (or writable if
+ * for_write is true), while also watching for a cancel interrupt
+ * (SIGINT/Ctrl-C).
  *
- * Free the current cancel connection, if any, and set to NULL.
+ * If a cancel interrupt arrives during the wait, a cancel request is sent to
+ * the server.  The cancel is sent at most once per call (tracked via
+ * *cancel_sent, which the caller should initialize to false).  After sending
+ * the cancel, this function continues waiting for the socket, since the server
+ * is expected to respond with an error that the caller will process normally.
+ *
+ * Returns true if the socket is ready, false on error.
  */
-void
-ResetCancelConn(void)
+bool
+cancellable_socket_wait(PGconn *conn, bool *cancel_sent, bool for_write)
 {
-	PGcancel   *oldCancelConn;
+	int			sock = PQsocket(conn);
 
-#ifdef WIN32
-	EnterCriticalSection(&cancelConnLock);
-#endif
+	if (sock < 0)
+		return false;
+
+	for (;;)
+	{
+#ifndef WIN32
+		fd_set		sock_mask;
+		fd_set		cancel_mask;
+		fd_set	   *read_set;
+		fd_set	   *write_set;
+		int			maxFd;
+		int			rc;
+
+		FD_ZERO(&sock_mask);
+		FD_SET(sock, &sock_mask);
+		maxFd = sock;
+
+		FD_ZERO(&cancel_mask);
+		if (cancel_pipe[0] >= 0)
+		{
+			FD_SET(cancel_pipe[0], &cancel_mask);
+			if (cancel_pipe[0] > maxFd)
+				maxFd = cancel_pipe[0];
+		}
+
+		if (for_write)
+		{
+			read_set = &cancel_mask;
+			write_set = &sock_mask;
+		}
+		else
+		{
+			/*
+			 * Watch both the socket and the cancel pipe for readability.
+			 * Merge them into one fd_set.
+			 */
+			if (cancel_pipe[0] >= 0)
+				FD_SET(cancel_pipe[0], &sock_mask);
+			read_set = &sock_mask;
+			write_set = NULL;
+		}
 
-	oldCancelConn = cancelConn;
+		rc = select(maxFd + 1, read_set, write_set, NULL, NULL);
 
-	/* be sure handle_sigint doesn't use pointer while freeing */
-	cancelConn = NULL;
+		if (rc < 0)
+		{
+			if (errno == EINTR)
+				continue;
+			return false;
+		}
 
-	if (oldCancelConn != NULL)
-		PQfreeCancel(oldCancelConn);
+		/* Check cancel pipe (always in read_set for write, merged for read) */
+		if (cancel_pipe[0] >= 0 &&
+			((for_write && FD_ISSET(cancel_pipe[0], &cancel_mask)) ||
+			 (!for_write && FD_ISSET(cancel_pipe[0], &sock_mask))))
+		{
+			drain_cancel_pipe();
 
-#ifdef WIN32
-	LeaveCriticalSection(&cancelConnLock);
+			if (!*cancel_sent)
+			{
+				send_cancel(conn);
+				*cancel_sent = true;
+			}
+
+			/* Check if the socket is also ready */
+			if (for_write)
+			{
+				if (!FD_ISSET(sock, &sock_mask))
+					continue;
+			}
+			else
+			{
+				if (!FD_ISSET(sock, &sock_mask))
+					continue;
+			}
+		}
+
+		if (FD_ISSET(sock, &sock_mask))
+			return true;
+#else							/* WIN32 */
+		HANDLE		events[2];
+		WSAEVENT	sock_event;
+		DWORD		ret;
+		long		net_events;
+
+		net_events = for_write ? FD_WRITE : (FD_READ | FD_CLOSE);
+
+		sock_event = WSACreateEvent();
+		if (sock_event == WSA_INVALID_EVENT)
+			return false;
+
+		if (WSAEventSelect(sock, sock_event, net_events) != 0)
+		{
+			WSACloseEvent(sock_event);
+			return false;
+		}
+
+		events[0] = sock_event;
+		events[1] = cancel_event;
+
+		ret = WaitForMultipleObjects(cancel_event ? 2 : 1,
+									 events, FALSE, INFINITE);
+
+		WSAEventSelect(sock, sock_event, 0);
+		WSACloseEvent(sock_event);
+
+		if (ret == WAIT_OBJECT_0)
+		{
+			/* Socket is ready */
+			return true;
+		}
+		else if (ret == WAIT_OBJECT_0 + 1)
+		{
+			/* Cancel event signaled */
+			ResetEvent(cancel_event);
+
+			if (!*cancel_sent)
+			{
+				send_cancel(conn);
+				*cancel_sent = true;
+			}
+
+			/* Loop back to wait for the socket */
+			continue;
+		}
+		else
+		{
+			/* WAIT_FAILED or unexpected return */
+			return false;
+		}
+#endif							/* WIN32 */
+	}
+}
+
+
+/*
+ * cancellable_getresult
+ *
+ * Like PQgetResult, but cancellable via SIGINT.  When a cancel interrupt
+ * arrives while waiting for data from the server, a cancel request is sent
+ * to the server.  The server is then expected to respond with an error,
+ * which will be returned as the PGresult.
+ */
+PGresult *
+cancellable_getresult(PGconn *conn)
+{
+	bool		cancel_sent = false;
+
+#ifndef WIN32
+	drain_cancel_pipe();
+#else
+	if (cancel_event)
+		ResetEvent(cancel_event);
 #endif
+
+	while (PQisBusy(conn))
+	{
+		if (!cancellable_socket_wait(conn, &cancel_sent, false))
+			break;
+
+		if (!PQconsumeInput(conn))
+			break;
+	}
+
+	return PQgetResult(conn);
+}
+
+/*
+ * cancellable_exec
+ *
+ * Like PQexec, but cancellable via SIGINT.
+ */
+PGresult *
+cancellable_exec(PGconn *conn, const char *query)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
+
+	if (!PQsendQuery(conn, query))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
 }
 
 
 /*
- * Code to support query cancellation
+ * cancellable_exec_params
+ *
+ * Like PQexecParams, but cancellable via SIGINT.
+ */
+PGresult *
+cancellable_exec_params(PGconn *conn, const char *query,
+						int nParams, const Oid *paramTypes,
+						const char *const *paramValues,
+						const int *paramLengths, const int *paramFormats,
+						int resultFormat)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
+
+	if (!PQsendQueryParams(conn, query, nParams, paramTypes,
+						   paramValues, paramLengths, paramFormats,
+						   resultFormat))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
+}
+
+/*
+ * cancellable_prepare
  *
- * Note that sending the cancel directly from the signal handler is safe
- * because PQcancel() is written to make it so.  We use write() to report
- * to stderr because it's better to use simple facilities in a signal
- * handler.
+ * Like PQprepare, but cancellable via SIGINT.
+ */
+PGresult *
+cancellable_prepare(PGconn *conn, const char *stmtName,
+					const char *query, int nParams,
+					const Oid *paramTypes)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
+
+	if (!PQsendPrepare(conn, stmtName, query, nParams, paramTypes))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
+}
+
+/*
+ * cancellable_describe_prepared
  *
- * On Windows, the signal canceling happens on a separate thread, because
- * that's how SetConsoleCtrlHandler works.  The PQcancel function is safe
- * for this (unlike PQrequestCancel).  However, a CRITICAL_SECTION is required
- * to protect the PGcancel structure against being changed while the signal
- * thread is using it.
+ * Like PQdescribePrepared, but cancellable via SIGINT.
  */
+PGresult *
+cancellable_describe_prepared(PGconn *conn, const char *stmtName)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
 
-#ifndef WIN32
+	if (!PQsendDescribePrepared(conn, stmtName))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
+}
 
 /*
- * handle_sigint
+ * cancellable_flush
  *
- * Handle interrupt signals by canceling the current command, if cancelConn
- * is set.
+ * Flush pending output on the connection, waiting for the socket to become
+ * writable if needed.  Cancellable via SIGINT.
+ *
+ * Returns 0 on success, -1 on error or cancel.
  */
-static void
-handle_sigint(SIGNAL_ARGS)
+static int
+cancellable_flush(PGconn *conn, bool *cancel_sent)
 {
-	char		errbuf[256];
+	int			rc;
 
-	CancelRequested = true;
+	while ((rc = PQflush(conn)) > 0)
+	{
+		if (!cancellable_socket_wait(conn, cancel_sent, true))
+			return -1;
+	}
 
-	if (cancel_callback != NULL)
-		cancel_callback();
+	return rc;
+}
+
+/*
+ * cancellable_put_copy_data
+ *
+ * Like PQputCopyData, but cancellable via SIGINT.  The connection must be in
+ * non-blocking mode.
+ *
+ * Returns 1 on success, -1 on error or cancel.
+ */
+int
+cancellable_put_copy_data(PGconn *conn, const char *buffer, int nbytes,
+						  bool *cancel_sent)
+{
+	int			rc;
 
-	/* Send QueryCancel if we are processing a database query */
-	if (cancelConn != NULL)
+	for (;;)
 	{
-		if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+		rc = PQputCopyData(conn, buffer, nbytes);
+
+		if (rc < 0)
+			return -1;
+
+		if (rc > 0)
 		{
-			write_stderr(cancel_sent_msg);
+			/* Data queued, flush it */
+			if (cancellable_flush(conn, cancel_sent) < 0)
+				return -1;
+			return 1;
 		}
-		else
+
+		/* rc == 0: would block, flush and retry */
+		if (cancellable_flush(conn, cancel_sent) < 0)
+			return -1;
+	}
+}
+
+/*
+ * cancellable_put_copy_end
+ *
+ * Like PQputCopyEnd, but cancellable via SIGINT.  The connection must be in
+ * non-blocking mode.
+ *
+ * Returns 1 on success, -1 on error or cancel.
+ */
+int
+cancellable_put_copy_end(PGconn *conn, const char *errormsg,
+						 bool *cancel_sent)
+{
+	int			rc;
+
+	for (;;)
+	{
+		rc = PQputCopyEnd(conn, errormsg);
+
+		if (rc < 0)
+			return -1;
+
+		if (rc > 0)
 		{
-			write_stderr(cancel_not_sent_msg);
-			write_stderr(errbuf);
+			/* End message queued, flush it */
+			if (cancellable_flush(conn, cancel_sent) < 0)
+				return -1;
+			return 1;
 		}
+
+		/* rc == 0: would block, flush and retry */
+		if (cancellable_flush(conn, cancel_sent) < 0)
+			return -1;
 	}
 }
 
 /*
- * setup_cancel_handler
+ * Helper to execute a lo_* SQL function via cancellable_exec_params and
+ * return a single integer column result.  Returns true on success, with
+ * the integer result in *result.  Returns false on failure.
+ */
+static bool
+lo_exec_int(PGconn *conn, const char *query, int nParams,
+			const char *const *paramValues, int *result)
+{
+	PGresult   *res;
+
+	res = cancellable_exec_params(conn, query, nParams, NULL,
+								  paramValues, NULL, NULL, 0);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK ||
+		PQntuples(res) != 1 || PQnfields(res) != 1)
+	{
+		PQclear(res);
+		return false;
+	}
+
+	*result = atoi(PQgetvalue(res, 0, 0));
+	PQclear(res);
+	return true;
+}
+
+#define LO_BUFSIZE 8192
+
+/*
+ * cancellable_lo_export
  *
- * Register query cancellation callback for SIGINT.
+ * Like lo_export, but each server round-trip is cancellable via SIGINT.
+ * Returns 1 on success, -1 on failure.
  */
-void
-setup_cancel_handler(void (*query_cancel_callback) (void))
+int
+cancellable_lo_export(PGconn *conn, Oid lobjId, const char *filename)
 {
-	cancel_callback = query_cancel_callback;
-	cancel_sent_msg = _("Cancel request sent\n");
-	cancel_not_sent_msg = _("Could not send cancel request: ");
+	char		oidbuf[32];
+	char		fdbuf[32];
+	char		lenbuf[32];
+	const char *params[2];
+	int			lobj;
+	int			fd;
+	int			result = 1;
+
+	/* lo_open */
+	sprintf(oidbuf, "%u", lobjId);
+	sprintf(lenbuf, "%d", INV_READ);
+	params[0] = oidbuf;
+	params[1] = lenbuf;
+	if (!lo_exec_int(conn, "SELECT lo_open($1::oid, $2::int4)",
+					 2, params, &lobj))
+		return -1;
+	if (lobj == -1)
+		return -1;
+
+	sprintf(fdbuf, "%d", lobj);
+
+	/* Open local file */
+	fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY, 0666);
+	if (fd < 0)
+	{
+		pg_log_error("could not open file \"%s\": %m", filename);
+		params[0] = fdbuf;
+		cancellable_exec_params(conn, "SELECT lo_close($1::int4)",
+								1, NULL, params, NULL, NULL, 0);
+		return -1;
+	}
 
-	pqsignal(SIGINT, handle_sigint);
+	/* Read loop */
+	sprintf(lenbuf, "%d", LO_BUFSIZE);
+	params[0] = fdbuf;
+	params[1] = lenbuf;
+	for (;;)
+	{
+		PGresult   *res;
+		char	   *data;
+		int			nbytes;
+		int			written;
+
+		res = cancellable_exec_params(conn,
+									  "SELECT loread($1::int4, $2::int4)",
+									  2, NULL, params, NULL, NULL, 1);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			PQclear(res);
+			result = -1;
+			break;
+		}
+
+		nbytes = PQgetlength(res, 0, 0);
+		if (nbytes == 0)
+		{
+			PQclear(res);
+			break;
+		}
+
+		data = PQgetvalue(res, 0, 0);
+		written = write(fd, data, nbytes);
+		PQclear(res);
+
+		if (written != nbytes)
+		{
+			pg_log_error("could not write to file \"%s\": %m", filename);
+			result = -1;
+			break;
+		}
+	}
+
+	close(fd);
+
+	/*
+	 * If the read loop failed, we're likely in an aborted transaction, so
+	 * skip lo_close to avoid overwriting the useful error message.
+	 */
+	if (result == 1)
+	{
+		int			close_result;
+
+		params[0] = fdbuf;
+		if (!lo_exec_int(conn, "SELECT lo_close($1::int4)",
+						 1, params, &close_result) ||
+			close_result != 0)
+			result = -1;
+	}
+
+	return result;
+}
+
+/*
+ * cancellable_lo_import
+ *
+ * Like lo_import, but each server round-trip is cancellable via SIGINT.
+ * Returns the OID of the new large object, or InvalidOid on failure.
+ */
+Oid
+cancellable_lo_import(PGconn *conn, const char *filename)
+{
+	char		oidbuf[32];
+	char		fdbuf[32];
+	char		modebuf[32];
+	const char *params[2];
+	int			paramLengths[2];
+	int			paramFormats[2];
+	int			fd;
+	Oid			lobjOid;
+	int			lobj;
+	int			tmp;
+	char		buf[LO_BUFSIZE];
+	int			nbytes;
+
+	/* Open local file */
+	fd = open(filename, O_RDONLY | PG_BINARY, 0666);
+	if (fd < 0)
+	{
+		pg_log_error("could not open file \"%s\": %m", filename);
+		return InvalidOid;
+	}
+
+	/* lo_creat */
+	sprintf(modebuf, "%d", INV_READ | INV_WRITE);
+	params[0] = modebuf;
+	if (!lo_exec_int(conn, "SELECT lo_creat($1::int4)",
+					 1, params, &tmp))
+	{
+		close(fd);
+		return InvalidOid;
+	}
+	lobjOid = (Oid) tmp;
+	if (lobjOid == InvalidOid)
+	{
+		close(fd);
+		return InvalidOid;
+	}
+
+	/* lo_open */
+	sprintf(oidbuf, "%u", lobjOid);
+	sprintf(modebuf, "%d", INV_WRITE);
+	params[0] = oidbuf;
+	params[1] = modebuf;
+	if (!lo_exec_int(conn, "SELECT lo_open($1::oid, $2::int4)",
+					 2, params, &lobj))
+	{
+		close(fd);
+		return InvalidOid;
+	}
+	if (lobj == -1)
+	{
+		close(fd);
+		return InvalidOid;
+	}
+
+	sprintf(fdbuf, "%d", lobj);
+
+	/* Write loop: read from file, write to large object */
+	while ((nbytes = read(fd, buf, LO_BUFSIZE)) > 0)
+	{
+		PGresult   *res;
+
+		paramFormats[0] = 0;	/* text for fd */
+		paramFormats[1] = 1;	/* binary for bytea data */
+		paramLengths[0] = 0;
+		paramLengths[1] = nbytes;
+		params[0] = fdbuf;
+		params[1] = buf;
+
+		res = cancellable_exec_params(conn,
+									  "SELECT lowrite($1::int4, $2::bytea)",
+									  2, NULL, params,
+									  paramLengths, paramFormats, 0);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			PQclear(res);
+			close(fd);
+			return InvalidOid;
+		}
+
+		tmp = atoi(PQgetvalue(res, 0, 0));
+		PQclear(res);
+
+		if (tmp != nbytes)
+		{
+			close(fd);
+			return InvalidOid;
+		}
+	}
+
+	if (nbytes < 0)
+	{
+		pg_log_error("could not read from file \"%s\": %m", filename);
+		params[0] = fdbuf;
+		paramFormats[0] = 0;
+		cancellable_exec_params(conn, "SELECT lo_close($1::int4)",
+								1, NULL, params, NULL, NULL, 0);
+		close(fd);
+		return InvalidOid;
+	}
+
+	close(fd);
+
+	/* lo_close */
+	params[0] = fdbuf;
+	if (!lo_exec_int(conn, "SELECT lo_close($1::int4)",
+					 1, params, &tmp) ||
+		tmp != 0)
+		return InvalidOid;
+
+	return lobjOid;
+}
+
+/*
+ * cancellable_lo_unlink
+ *
+ * Like lo_unlink, but cancellable via SIGINT.
+ * Returns 1 on success, -1 on failure.
+ */
+int
+cancellable_lo_unlink(PGconn *conn, Oid lobjId)
+{
+	char		oidbuf[32];
+	const char *params[1];
+	int			result;
+
+	sprintf(oidbuf, "%u", lobjId);
+	params[0] = oidbuf;
+
+	if (!lo_exec_int(conn, "SELECT lo_unlink($1::oid)",
+					 1, params, &result))
+		return -1;
+
+	return result;
+}
+
+/*
+ * cancel_pipe_fd
+ *
+ * Return the read end of the cancel pipe, for use in select()/poll() by
+ * callers that manage their own wait loops (e.g. parallel_slot.c).
+ * Returns -1 on Windows or if the cancel handler hasn't been set up.
+ */
+int
+cancel_pipe_fd(void)
+{
+#ifndef WIN32
+	return cancel_pipe[0];
+#else
+	return -1;
+#endif
 }
 
-#else							/* WIN32 */
 
+#ifdef WIN32
+/*
+ * Console control handler for Windows.
+ *
+ * This runs in a separate thread created by the OS.  It sets
+ * CancelRequested and invokes the callback, but does not send a cancel
+ * request itself -- that is done by the main thread in the cancellable
+ * wait functions when it notices CancelRequested.
+ */
 static BOOL WINAPI
 consoleHandler(DWORD dwCtrlType)
 {
-	char		errbuf[256];
-
 	if (dwCtrlType == CTRL_C_EVENT ||
 		dwCtrlType == CTRL_BREAK_EVENT)
 	{
 		CancelRequested = true;
 
+		if (cancel_event)
+			SetEvent(cancel_event);
+
 		if (cancel_callback != NULL)
 			cancel_callback();
 
-		/* Send QueryCancel if we are processing a database query */
-		EnterCriticalSection(&cancelConnLock);
-		if (cancelConn != NULL)
-		{
-			if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
-			{
-				write_stderr(cancel_sent_msg);
-			}
-			else
-			{
-				write_stderr(cancel_not_sent_msg);
-				write_stderr(errbuf);
-			}
-		}
-
-		LeaveCriticalSection(&cancelConnLock);
-
 		return TRUE;
 	}
 	else
@@ -228,16 +869,77 @@ consoleHandler(DWORD dwCtrlType)
 		return FALSE;
 }
 
+#else							/* !WIN32 */
+
+/*
+ * Signal handler for SIGINT.  Sets CancelRequested and wakes up the main
+ * thread by writing to the pipe.
+ */
+static void
+handle_sigint(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+	char		c = 1;
+
+	CancelRequested = true;
+
+	if (cancel_callback != NULL)
+		cancel_callback();
+
+	/* Wake up the main thread's select() call */
+	if (cancel_pipe[1] >= 0)
+		(void) write(cancel_pipe[1], &c, 1);
+
+	errno = save_errno;
+}
+
+#endif							/* WIN32 */
+
+
+/*
+ * setup_cancel_handler
+ *
+ * Set up handler for SIGINT (Unix) or console events (Windows) to send a
+ * cancel request to the server.
+ *
+ * The optional callback is invoked directly from the signal handler context
+ * on every SIGINT (on Unix), so it must be async-signal-safe.
+ */
 void
-setup_cancel_handler(void (*callback) (void))
+setup_cancel_handler(void (*query_cancel_callback) (void))
 {
-	cancel_callback = callback;
+	cancel_callback = query_cancel_callback;
 	cancel_sent_msg = _("Cancel request sent\n");
 	cancel_not_sent_msg = _("Could not send cancel request: ");
 
-	InitializeCriticalSection(&cancelConnLock);
-
+#ifdef WIN32
+	cancel_event = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (cancel_event == NULL)
+	{
+		pg_log_error("could not create event for cancel: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
 	SetConsoleCtrlHandler(consoleHandler, TRUE);
-}
+#else
 
-#endif							/* WIN32 */
+	/*
+	 * Create the pipe that the signal handler uses to wake the main thread.
+	 * See comment on cancel_pipe above.
+	 */
+	if (pipe(cancel_pipe) < 0)
+	{
+		pg_log_error("could not create pipe for cancel: %m");
+		exit(1);
+	}
+
+	/*
+	 * Make both ends non-blocking: the write end so that the signal handler
+	 * won't block, and the read end so that drain_cancel_pipe() won't block.
+	 */
+	fcntl(cancel_pipe[0], F_SETFL, O_NONBLOCK);
+	fcntl(cancel_pipe[1], F_SETFL, O_NONBLOCK);
+
+	pqsignal(SIGINT, handle_sigint);
+#endif
+}
diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c
index fb9e6cc4ec1..dbf958c5656 100644
--- a/src/fe_utils/parallel_slot.c
+++ b/src/fe_utils/parallel_slot.c
@@ -60,13 +60,11 @@ consumeQueryResult(ParallelSlot *slot)
 	bool		ok = true;
 	PGresult   *result;
 
-	SetCancelConn(slot->connection);
-	while ((result = PQgetResult(slot->connection)) != NULL)
+	while ((result = cancellable_getresult(slot->connection)) != NULL)
 	{
 		if (!processQueryResult(slot, result))
 			ok = false;
 	}
-	ResetCancelConn();
 	return ok;
 }
 
@@ -81,6 +79,7 @@ select_loop(int maxFd, fd_set *workerset)
 {
 	int			i;
 	fd_set		saveSet = *workerset;
+	int			cancel_fd = cancel_pipe_fd();
 
 	if (CancelRequested)
 		return -1;
@@ -89,7 +88,8 @@ select_loop(int maxFd, fd_set *workerset)
 	{
 		/*
 		 * On Windows, we need to check once in a while for cancel requests;
-		 * on other platforms we rely on select() returning when interrupted.
+		 * on other platforms the cancel pipe makes select() return
+		 * immediately when SIGINT arrives.
 		 */
 		struct timeval *tvp;
 #ifdef WIN32
@@ -101,6 +101,16 @@ select_loop(int maxFd, fd_set *workerset)
 #endif
 
 		*workerset = saveSet;
+
+#ifndef WIN32
+		if (cancel_fd >= 0)
+		{
+			FD_SET(cancel_fd, workerset);
+			if (cancel_fd > maxFd)
+				maxFd = cancel_fd;
+		}
+#endif
+
 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
 
 #ifdef WIN32
@@ -115,10 +125,25 @@ select_loop(int maxFd, fd_set *workerset)
 
 		if (i < 0 && errno == EINTR)
 			continue;			/* ignore this */
-		if (i < 0 || CancelRequested)
-			return -1;			/* but not this */
+		if (i < 0)
+			return -1;
+
+#ifndef WIN32
+		if (cancel_fd >= 0 && FD_ISSET(cancel_fd, workerset))
+			return -1;			/* cancel requested */
+#endif
+
+		if (CancelRequested)
+			return -1;
 		if (i == 0)
 			continue;			/* timeout (Win32 only) */
+
+#ifndef WIN32
+		/* Remove the cancel pipe from the returned set */
+		if (cancel_fd >= 0)
+			FD_CLR(cancel_fd, workerset);
+#endif
+
 		break;
 	}
 
@@ -235,13 +260,15 @@ wait_on_slots(ParallelSlotArray *sa)
 	if (cancelconn == NULL)
 		return false;
 
-	SetCancelConn(cancelconn);
 	i = select_loop(maxFd, &slotset);
-	ResetCancelConn();
 
-	/* failure? */
+	/* failure or cancel? */
 	if (i < 0)
+	{
+		if (CancelRequested && cancelconn != NULL)
+			send_cancel(cancelconn);
 		return false;
+	}
 
 	for (i = 0; i < sa->numslots; i++)
 	{
diff --git a/src/fe_utils/query_utils.c b/src/fe_utils/query_utils.c
index c05fd9c21df..7e5b6676102 100644
--- a/src/fe_utils/query_utils.c
+++ b/src/fe_utils/query_utils.c
@@ -26,7 +26,7 @@ executeQuery(PGconn *conn, const char *query, bool echo)
 	if (echo)
 		printf("%s\n", query);
 
-	res = PQexec(conn, query);
+	res = cancellable_exec(conn, query);
 	if (!res ||
 		PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
@@ -51,7 +51,7 @@ executeCommand(PGconn *conn, const char *query, bool echo)
 	if (echo)
 		printf("%s\n", query);
 
-	res = PQexec(conn, query);
+	res = cancellable_exec(conn, query);
 	if (!res ||
 		PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
@@ -79,9 +79,7 @@ executeMaintenanceCommand(PGconn *conn, const char *query, bool echo)
 	if (echo)
 		printf("%s\n", query);
 
-	SetCancelConn(conn);
-	res = PQexec(conn, query);
-	ResetCancelConn();
+	res = cancellable_exec(conn, query);
 
 	r = (res && PQresultStatus(res) == PGRES_COMMAND_OK);
 
diff --git a/src/include/fe_utils/cancel.h b/src/include/fe_utils/cancel.h
index e174fb83b92..1664c278a97 100644
--- a/src/include/fe_utils/cancel.h
+++ b/src/include/fe_utils/cancel.h
@@ -2,6 +2,7 @@
  *
  * Query cancellation support for frontend code
  *
+ * See cancel.c for an overview of the three cancellation mechanisms.
  *
  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -20,13 +21,32 @@
 
 extern PGDLLIMPORT volatile sig_atomic_t CancelRequested;
 
-extern void SetCancelConn(PGconn *conn);
-extern void ResetCancelConn(void);
-
-/*
- * A callback can be optionally set up to be called at cancellation
- * time.
- */
 extern void setup_cancel_handler(void (*query_cancel_callback) (void));
 
+extern void send_cancel(PGconn *conn);
+extern PGresult *cancellable_exec(PGconn *conn, const char *query);
+extern PGresult *cancellable_exec_params(PGconn *conn, const char *query,
+										 int nParams, const Oid *paramTypes,
+										 const char *const *paramValues,
+										 const int *paramLengths,
+										 const int *paramFormats,
+										 int resultFormat);
+extern PGresult *cancellable_prepare(PGconn *conn, const char *stmtName,
+									 const char *query, int nParams,
+									 const Oid *paramTypes);
+extern PGresult *cancellable_describe_prepared(PGconn *conn,
+											   const char *stmtName);
+extern PGresult *cancellable_getresult(PGconn *conn);
+extern bool cancellable_socket_wait(PGconn *conn, bool *cancel_sent,
+									bool for_write);
+extern int	cancellable_put_copy_data(PGconn *conn, const char *buffer,
+									  int nbytes, bool *cancel_sent);
+extern int	cancellable_put_copy_end(PGconn *conn, const char *errormsg,
+									 bool *cancel_sent);
+extern int	cancellable_lo_export(PGconn *conn, Oid lobjId,
+								  const char *filename);
+extern Oid	cancellable_lo_import(PGconn *conn, const char *filename);
+extern int	cancellable_lo_unlink(PGconn *conn, Oid lobjId);
+extern int	cancel_pipe_fd(void);
+
 #endif							/* CANCEL_H */

base-commit: f95d73ed433207c4323802dc96e52f3e5553a86c
-- 
2.53.0

From 6d436ade8ad7f6fb77502669e14b71dda4618b1f Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sat, 13 Dec 2025 18:18:13 +0100
Subject: [PATCH v5 1/5] Move Windows pthread compatibility functions to
 src/port

This is in preparation of a follow-up commit which will start to use
these functions in more places than just libpq.
---
 configure.ac                                   | 1 +
 src/interfaces/libpq/Makefile                  | 1 -
 src/interfaces/libpq/meson.build               | 2 +-
 src/port/meson.build                           | 1 +
 src/{interfaces/libpq => port}/pthread-win32.c | 4 ++--
 5 files changed, 5 insertions(+), 4 deletions(-)
 rename src/{interfaces/libpq => port}/pthread-win32.c (94%)

diff --git a/configure.ac b/configure.ac
index f4e3bd307c8..9284193771a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1949,6 +1949,7 @@ if test "$PORTNAME" = "win32"; then
   AC_LIBOBJ(dirmod)
   AC_LIBOBJ(kill)
   AC_LIBOBJ(open)
+  AC_LIBOBJ(pthread-win32)
   AC_LIBOBJ(system)
   AC_LIBOBJ(win32common)
   AC_LIBOBJ(win32dlopen)
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 0963995eed4..d6cfb00d655 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -75,7 +75,6 @@ endif
 
 ifeq ($(PORTNAME), win32)
 OBJS += \
-	pthread-win32.o \
 	win32.o
 endif
 
diff --git a/src/interfaces/libpq/meson.build b/src/interfaces/libpq/meson.build
index b0ae72167a1..b949780b85b 100644
--- a/src/interfaces/libpq/meson.build
+++ b/src/interfaces/libpq/meson.build
@@ -20,7 +20,7 @@ libpq_sources = files(
 libpq_so_sources = [] # for shared lib, in addition to the above
 
 if host_system == 'windows'
-  libpq_sources += files('pthread-win32.c', 'win32.c')
+  libpq_sources += files('win32.c')
   libpq_so_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
     '--NAME', 'libpq',
     '--FILEDESC', 'PostgreSQL Access Library',])
diff --git a/src/port/meson.build b/src/port/meson.build
index 7296f8e3c03..a0fc13a5e62 100644
--- a/src/port/meson.build
+++ b/src/port/meson.build
@@ -32,6 +32,7 @@ if host_system == 'windows'
     'dirmod.c',
     'kill.c',
     'open.c',
+    'pthread-win32.c',
     'system.c',
     'win32common.c',
     'win32dlopen.c',
diff --git a/src/interfaces/libpq/pthread-win32.c b/src/port/pthread-win32.c
similarity index 94%
rename from src/interfaces/libpq/pthread-win32.c
rename to src/port/pthread-win32.c
index cf66284f007..48d68b693a7 100644
--- a/src/interfaces/libpq/pthread-win32.c
+++ b/src/port/pthread-win32.c
@@ -5,12 +5,12 @@
 *
 * Copyright (c) 2004-2026, PostgreSQL Global Development Group
 * IDENTIFICATION
-*	src/interfaces/libpq/pthread-win32.c
+*	src/port/pthread-win32.c
 *
 *-------------------------------------------------------------------------
 */
 
-#include "postgres_fe.h"
+#include "c.h"
 
 #include "pthread-win32.h"
 

base-commit: f95d73ed433207c4323802dc96e52f3e5553a86c
-- 
2.53.0

From b8cf36fd5c9cfd2b3f022810565491e1780421c2 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sat, 13 Dec 2025 13:05:50 +0100
Subject: [PATCH v5 2/5] Don't use deprecated and insecure PQcancel psql and
 other tools anymore

All of our frontend tools that used our fe_utils to cancel queries,
including psql, still used PQcancel to send cancel requests to the
server. That function is insecure, because it does not use encryption to
send the cancel request. This starts using the new cancellation APIs
(introduced in 61461a300) for all these frontend tools. These APIs use
the same encryption settings as the connection that's being cancelled.
Since these APIs are not signal-safe this required a refactor to not
send the cancel request in a signal handler anymore, but instead using a
dedicated thread. Similar logic was already used for Windows anyway, so
this also has the benefit that it makes the cancel logic more uniform
across our supported platforms.

The calls to PQcancel in pg_dump are still kept and will be removed in
a later commit. The reason for that is that that code does not use
the helpers from fe_utils to cancel queries, and instead implements its
own logic.
---
 meson.build                   |   2 +-
 src/fe_utils/Makefile         |   2 +-
 src/fe_utils/cancel.c         | 368 +++++++++++++++++++++++-----------
 src/include/fe_utils/cancel.h |   4 -
 4 files changed, 252 insertions(+), 124 deletions(-)

diff --git a/meson.build b/meson.build
index ddf5172982f..e72d91500bb 100644
--- a/meson.build
+++ b/meson.build
@@ -3482,7 +3482,7 @@ frontend_code = declare_dependency(
   include_directories: [postgres_inc],
   link_with: [fe_utils, common_static, pgport_static],
   sources: generated_headers_stamp,
-  dependencies: [os_deps, libintl],
+  dependencies: [os_deps, libintl, thread_dep],
 )
 
 backend_both_deps += [
diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile
index cbfbf93ac69..809ab21cc0c 100644
--- a/src/fe_utils/Makefile
+++ b/src/fe_utils/Makefile
@@ -17,7 +17,7 @@ subdir = src/fe_utils
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) -I$(top_srcdir)/src/port $(CPPFLAGS)
 
 OBJS = \
 	archive.o \
diff --git a/src/fe_utils/cancel.c b/src/fe_utils/cancel.c
index e6b75439f56..5d645c554ab 100644
--- a/src/fe_utils/cancel.c
+++ b/src/fe_utils/cancel.c
@@ -2,9 +2,38 @@
  *
  * Query cancellation support for frontend code
  *
- * Assorted utility functions to control query cancellation with signal
- * handler for SIGINT.
+ * This module provides SIGINT/Ctrl-C handling for frontend tools that need
+ * to cancel queries running on the server.  It combines three completely
+ * independent mechanisms, any combination of which can be used by a caller:
  *
+ * 1. Server cancel request -- Often what applications need. When a query is
+ *    running, and the main thread is waiting for the result of that query in a
+ *    blocking manner, we want SIGINT/Ctrl-C to cancel that query. This can be
+ *    done by having the application call SetCancelConn() to register the
+ *    connection that is running the query, prior to waiting for the result.
+ *    When SIGINT/Ctrl-C is received a cancel request for this connection will
+ *    then be sent to the server from a separate thread. That in turn will then
+ *    (assuming a co-operating server) cause the server to cancel the query and
+ *    send an error to the waiting client on the main thread. The cancel
+ *    connection is a process-wide global, so only one connection can be the
+ *    cancel target at a time. ResetCancelConn() can be used to unregister the
+ *    connection again, preventing sending a cancel request if SIGINT/Ctrl-C is
+ *    received after blocking wait has already completed.
+ *
+ * 2. CancelRequested flag -- A more involved but also much more flexible way
+ *    of cancelling. A volatile sig_atomic_t CancelRequested flag is set to
+ *    true whenever SIGINT is received. This means that the application code
+ *    can fully control what it does with this flag. The primary usecase for
+ *    this is when the application code is not blocked (indefinitely), but
+ *    needs to take an action when Ctrl-C is pressed, such as break out of a
+ *    long running loop.
+ *
+ * 3. Cancel callback -- The most complex way of handling a sigint. An optional
+ *    function pointer registered via setup_cancel_handler().  If set, it is
+ *    called directly from the signal handler, so it must be async-signal-safe.
+ *    Writing async-signal-safe code is not easy, so this is only recommended
+ *    as a last resort. psql uses this to longjmp back to the main loop when no
+ *    query is active.
  *
  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -16,9 +45,21 @@
 
 #include "postgres_fe.h"
 
+#include <signal.h>
 #include <unistd.h>
 
+#ifndef WIN32
+#include <fcntl.h>
+#endif
+
+#ifdef WIN32
+#include "pthread-win32.h"
+#else
+#include <pthread.h>
+#endif
+
 #include "common/connect.h"
+#include "common/logging.h"
 #include "fe_utils/cancel.h"
 #include "fe_utils/string_utils.h"
 
@@ -36,11 +77,22 @@
 		(void) rc_; \
 	} while (0)
 
+
 /*
- * Contains all the information needed to cancel a query issued from
- * a database connection to the backend.
+ * Cancel connection that should be used to send cancel requests.
  */
-static PGcancel *volatile cancelConn = NULL;
+static PGcancelConn *cancelConn = NULL;
+
+/*
+ * Generation counter for cancelConn. Incremented each time cancelConn is
+ * changed. Used to detect if cancelConn was replaced while we were using it.
+ */
+static uint64 cancelConnGeneration = 0;
+
+/*
+ * Mutex protecting cancelConn and cancelConnGeneration.
+ */
+static pthread_mutex_t cancelConnLock = PTHREAD_MUTEX_INITIALIZER;
 
 /*
  * Predetermined localized error strings --- needed to avoid trying
@@ -58,186 +110,266 @@ static const char *cancel_not_sent_msg = NULL;
  */
 volatile sig_atomic_t CancelRequested = false;
 
-#ifdef WIN32
-static CRITICAL_SECTION cancelConnLock;
-#endif
-
 /*
  * Additional callback for cancellations.
  */
 static void (*cancel_callback) (void) = NULL;
 
+#ifndef WIN32
+/*
+ * On Unix, the SIGINT signal handler cannot call PQcancelBlocking() directly
+ * because it is not async-signal-safe.  Instead, we use a pipe to wake a
+ * dedicated cancel thread: the signal handler writes a byte to the pipe, and
+ * the cancel thread's blocking read() returns, triggering the actual cancel
+ * request.
+ */
+static int	cancel_pipe[2] = {-1, -1};
+static pthread_t cancel_thread;
+#endif
+
 
 /*
- * SetCancelConn
+ * Send a cancel request to the connection, if one is set.
+ *
+ * Called from the cancel thread (Unix) or the console handler thread
+ * (Windows), never from the signal handler itself.
  *
- * Set cancelConn to point to the current database connection.
+ * To avoid the cancel connection being freed by a concurrent
+ * SetCancelConn()/ResetCancelConn() call on the main thread while we are
+ * using it, we temporarily take it out of the global variable while sending
+ * the request.  A generation counter lets us detect whether the main thread
+ * replaced it in the meantime, in which case we free the old one instead of
+ * putting it back.
+ *
+ * Note: there is an inherent race where, if this thread is slow to process
+ * the wakeup (e.g. due to a network delay sending a previous cancel), the
+ * main thread may have already moved on to a different query by the time we
+ * send the cancel.  This is unavoidable with the server's cancel protocol,
+ * which identifies the session but not individual queries.
  */
-void
-SetCancelConn(PGconn *conn)
+static void
+SendCancelRequest(void)
 {
-	PGcancel   *oldCancelConn;
+	PGcancelConn *cc;
+	uint64		generation;
+	bool		putConnectionBack = false;
+
+	/*
+	 * Borrow the cancel connection from the global, setting it to NULL so
+	 * that SetCancelConn/ResetCancelConn won't free it while we're using it.
+	 */
+	pthread_mutex_lock(&cancelConnLock);
+	cc = cancelConn;
+	generation = cancelConnGeneration;
+	cancelConn = NULL;
+	pthread_mutex_unlock(&cancelConnLock);
 
-#ifdef WIN32
-	EnterCriticalSection(&cancelConnLock);
-#endif
+	if (cc == NULL)
+		return;
 
-	/* Free the old one if we have one */
-	oldCancelConn = cancelConn;
+	write_stderr(cancel_sent_msg);
 
-	/* be sure handle_sigint doesn't use pointer while freeing */
-	cancelConn = NULL;
+	if (!PQcancelBlocking(cc))
+	{
+		char	   *errmsg = PQcancelErrorMessage(cc);
 
-	if (oldCancelConn != NULL)
-		PQfreeCancel(oldCancelConn);
+		write_stderr(cancel_not_sent_msg);
+		if (errmsg)
+			write_stderr(errmsg);
+	}
+	/* Reset for possible reuse */
+	PQcancelReset(cc);
+
+	/*
+	 * Put the cancel connection back if it wasn't replaced while we were
+	 * using it.
+	 */
+	pthread_mutex_lock(&cancelConnLock);
+	if (cancelConnGeneration == generation)
+	{
+		/* Generation unchanged, put it back for reuse */
+		cancelConn = cc;
+		putConnectionBack = true;
+	}
+	pthread_mutex_unlock(&cancelConnLock);
 
-	cancelConn = PQgetCancel(conn);
+	/* If it was replaced, we free it, because we were the last user */
+	if (!putConnectionBack)
+		PQcancelFinish(cc);
+}
 
-#ifdef WIN32
-	LeaveCriticalSection(&cancelConnLock);
-#endif
+
+/*
+ * Helper to replace cancelConn with a new value.
+ */
+static void
+SetCancelConnInternal(PGcancelConn *newCancelConn)
+{
+	PGcancelConn *oldCancelConn;
+
+	pthread_mutex_lock(&cancelConnLock);
+	oldCancelConn = cancelConn;
+	cancelConn = newCancelConn;
+	cancelConnGeneration++;
+	pthread_mutex_unlock(&cancelConnLock);
+
+	if (oldCancelConn != NULL)
+		PQcancelFinish(oldCancelConn);
+}
+
+/*
+ * SetCancelConn
+ *
+ * Set cancelConn to point to a cancel connection for the given database
+ * connection. This creates a new PGcancelConn that can be used to send
+ * cancel requests.
+ */
+void
+SetCancelConn(PGconn *conn)
+{
+	SetCancelConnInternal(PQcancelCreate(conn));
 }
 
 /*
  * ResetCancelConn
  *
- * Free the current cancel connection, if any, and set to NULL.
+ * Clear cancelConn, preventing any pending cancel from being sent.
  */
 void
 ResetCancelConn(void)
 {
-	PGcancel   *oldCancelConn;
+	SetCancelConnInternal(NULL);
+}
 
-#ifdef WIN32
-	EnterCriticalSection(&cancelConnLock);
-#endif
 
-	oldCancelConn = cancelConn;
+#ifdef WIN32
+/*
+ * Console control handler for Windows.
+ *
+ * This runs in a separate thread created by the OS, so we can safely call
+ * the blocking cancel API directly.
+ */
+static BOOL WINAPI
+consoleHandler(DWORD dwCtrlType)
+{
+	if (dwCtrlType == CTRL_C_EVENT ||
+		dwCtrlType == CTRL_BREAK_EVENT)
+	{
+		CancelRequested = true;
 
-	/* be sure handle_sigint doesn't use pointer while freeing */
-	cancelConn = NULL;
+		if (cancel_callback != NULL)
+			cancel_callback();
 
-	if (oldCancelConn != NULL)
-		PQfreeCancel(oldCancelConn);
+		SendCancelRequest();
 
-#ifdef WIN32
-	LeaveCriticalSection(&cancelConnLock);
-#endif
+		return TRUE;
+	}
+	else
+		/* Return FALSE for any signals not being handled */
+		return FALSE;
 }
 
+#else							/* !WIN32 */
 
 /*
- * Code to support query cancellation
- *
- * Note that sending the cancel directly from the signal handler is safe
- * because PQcancel() is written to make it so.  We use write() to report
- * to stderr because it's better to use simple facilities in a signal
- * handler.
- *
- * On Windows, the signal canceling happens on a separate thread, because
- * that's how SetConsoleCtrlHandler works.  The PQcancel function is safe
- * for this (unlike PQrequestCancel).  However, a CRITICAL_SECTION is required
- * to protect the PGcancel structure against being changed while the signal
- * thread is using it.
+ * Cancel thread main function. Waits for the signal handler to write to the
+ * pipe, then sends a cancel request.
  */
+static void *
+cancel_thread_main(void *arg)
+{
+	for (;;)
+	{
+		char		buf[16];
+		ssize_t		rc;
 
-#ifndef WIN32
+		/* Wait for signal handler to wake us up */
+		rc = read(cancel_pipe[0], buf, sizeof(buf));
+		if (rc <= 0)
+		{
+			if (errno == EINTR)
+				continue;
+			/* Pipe closed or error - exit thread */
+			break;
+		}
+
+		SendCancelRequest();
+	}
+
+	return NULL;
+}
 
 /*
- * handle_sigint
- *
- * Handle interrupt signals by canceling the current command, if cancelConn
- * is set.
+ * Signal handler for SIGINT. Sets CancelRequested and wakes up the cancel
+ * thread by writing to the pipe.
  */
 static void
 handle_sigint(SIGNAL_ARGS)
 {
-	char		errbuf[256];
+	int			save_errno = errno;
+	char		c = 1;
 
 	CancelRequested = true;
 
 	if (cancel_callback != NULL)
 		cancel_callback();
 
-	/* Send QueryCancel if we are processing a database query */
-	if (cancelConn != NULL)
-	{
-		if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
-		{
-			write_stderr(cancel_sent_msg);
-		}
-		else
-		{
-			write_stderr(cancel_not_sent_msg);
-			write_stderr(errbuf);
-		}
-	}
+	/* Wake up the cancel thread - write() is async-signal-safe */
+	if (cancel_pipe[1] >= 0)
+		(void) write(cancel_pipe[1], &c, 1);
+
+	errno = save_errno;
 }
 
+#endif							/* WIN32 */
+
+
 /*
  * setup_cancel_handler
  *
- * Register query cancellation callback for SIGINT.
+ * Set up handler for SIGINT (Unix) or console events (Windows) to send a
+ * cancel request to the server.
+ *
+ * The optional callback is invoked directly from the signal handler context
+ * on every SIGINT (on Unix), so it must be async-signal-safe.
  */
 void
 setup_cancel_handler(void (*query_cancel_callback) (void))
 {
 	cancel_callback = query_cancel_callback;
-	cancel_sent_msg = _("Cancel request sent\n");
+	cancel_sent_msg = _("Sending cancel request\n");
 	cancel_not_sent_msg = _("Could not send cancel request: ");
 
-	pqsignal(SIGINT, handle_sigint);
-}
+#ifdef WIN32
+	SetConsoleCtrlHandler(consoleHandler, TRUE);
+#else
 
-#else							/* WIN32 */
+	/*
+	 * Create the pipe and cancel thread (see comment on cancel_pipe above).
+	 */
+	if (pipe(cancel_pipe) < 0)
+	{
+		pg_log_error("could not create pipe for cancel: %m");
+		exit(1);
+	}
 
-static BOOL WINAPI
-consoleHandler(DWORD dwCtrlType)
-{
-	char		errbuf[256];
+	/*
+	 * Make the write end non-blocking, so that the signal handler won't block
+	 * if the pipe buffer is full (which is very unlikely in practice but
+	 * possible in theory).
+	 */
+	fcntl(cancel_pipe[1], F_SETFL, O_NONBLOCK);
 
-	if (dwCtrlType == CTRL_C_EVENT ||
-		dwCtrlType == CTRL_BREAK_EVENT)
 	{
-		CancelRequested = true;
-
-		if (cancel_callback != NULL)
-			cancel_callback();
+		int			rc = pthread_create(&cancel_thread, NULL, cancel_thread_main, NULL);
 
-		/* Send QueryCancel if we are processing a database query */
-		EnterCriticalSection(&cancelConnLock);
-		if (cancelConn != NULL)
+		if (rc != 0)
 		{
-			if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
-			{
-				write_stderr(cancel_sent_msg);
-			}
-			else
-			{
-				write_stderr(cancel_not_sent_msg);
-				write_stderr(errbuf);
-			}
+			pg_log_error("could not create cancel thread: %s", strerror(rc));
+			exit(1);
 		}
-
-		LeaveCriticalSection(&cancelConnLock);
-
-		return TRUE;
 	}
-	else
-		/* Return FALSE for any signals not being handled */
-		return FALSE;
-}
 
-void
-setup_cancel_handler(void (*callback) (void))
-{
-	cancel_callback = callback;
-	cancel_sent_msg = _("Cancel request sent\n");
-	cancel_not_sent_msg = _("Could not send cancel request: ");
-
-	InitializeCriticalSection(&cancelConnLock);
-
-	SetConsoleCtrlHandler(consoleHandler, TRUE);
+	pqsignal(SIGINT, handle_sigint);
+#endif
 }
-
-#endif							/* WIN32 */
diff --git a/src/include/fe_utils/cancel.h b/src/include/fe_utils/cancel.h
index e174fb83b92..8afb2d778bf 100644
--- a/src/include/fe_utils/cancel.h
+++ b/src/include/fe_utils/cancel.h
@@ -23,10 +23,6 @@ extern PGDLLIMPORT volatile sig_atomic_t CancelRequested;
 extern void SetCancelConn(PGconn *conn);
 extern void ResetCancelConn(void);
 
-/*
- * A callback can be optionally set up to be called at cancellation
- * time.
- */
 extern void setup_cancel_handler(void (*query_cancel_callback) (void));
 
 #endif							/* CANCEL_H */
-- 
2.53.0

From b8f3b361a8cab9cbfc5ff8f84a79f807ef8ddfba Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sun, 8 Feb 2026 19:00:12 +0100
Subject: [PATCH v5 3/5] pg_dump: Don't use the deprecated and insecure
 PQcancel

pg_dump still used PQcancel to send cancel requests to the server when
the dump was cancelled. That libpq function is insecure, because it does
not use encryption to send the cancel request. This commit starts using the new
cancellation APIs (introduced in 61461a300) in pg_dump. These APIs use
the same encryption settings as the connection that's being cancelled.
Since these APIs are not signal-safe this required a refactor to not
send the cancel request in a signal handler anymore, but instead using a
dedicated thread. Windows was already doing that too, so now the paths
can share some code. There's still quite a bit of behavioural difference
though, because the pg_dump is using threads for parallelism on Windows,
but processes on Unixes.
---
 src/bin/pg_dump/Makefile             |   2 +-
 src/bin/pg_dump/meson.build          |   2 +
 src/bin/pg_dump/parallel.c           | 404 ++++++++++++++-------------
 src/bin/pg_dump/pg_backup_archiver.c |   2 +-
 src/bin/pg_dump/pg_backup_archiver.h |   8 +-
 src/bin/pg_dump/pg_backup_db.c       |   7 +-
 6 files changed, 222 insertions(+), 203 deletions(-)

diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index 79073b0a0ea..f76346c4f6c 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -21,7 +21,7 @@ export LZ4
 export ZSTD
 export with_icu
 
-override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
+override CPPFLAGS := -I$(libpq_srcdir) -I$(top_srcdir)/src/port $(CPPFLAGS)
 LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
 
 OBJS = \
diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build
index 7c9a475963b..c772cd0e2c0 100644
--- a/src/bin/pg_dump/meson.build
+++ b/src/bin/pg_dump/meson.build
@@ -22,6 +22,8 @@ pg_dump_common_sources = files(
 pg_dump_common = static_library('libpgdump_common',
   pg_dump_common_sources,
   c_pch: pch_postgres_fe_h,
+  # port needs to be in include path due to pthread-win32.h
+  include_directories: ['../../port'],
   dependencies: [frontend_code, libpq, lz4, zlib, zstd],
   kwargs: internal_lib_args,
 )
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index a28561fbd84..cc2fd7eecf7 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -58,8 +58,12 @@
 #include <signal.h>
 #include <unistd.h>
 #include <fcntl.h>
+#include <pthread.h>
+#else
+#include "pthread-win32.h"
 #endif
 
+#include "common/logging.h"
 #include "fe_utils/string_utils.h"
 #include "parallel.h"
 #include "pg_backup_utils.h"
@@ -167,6 +171,7 @@ typedef struct DumpSignalInformation
 	ArchiveHandle *myAH;		/* database connection to issue cancel for */
 	ParallelState *pstate;		/* parallel state, if any */
 	bool		handler_set;	/* signal handler set up in this process? */
+	bool		cancel_requested;	/* cancel requested via signal? */
 #ifndef WIN32
 	bool		am_worker;		/* am I a worker process? */
 #endif
@@ -174,8 +179,20 @@ typedef struct DumpSignalInformation
 
 static volatile DumpSignalInformation signal_info;
 
-#ifdef WIN32
-static CRITICAL_SECTION signal_info_lock;
+/*
+ * Mutex protecting signal_info during cancel operations.
+ */
+static pthread_mutex_t signal_info_lock;
+
+#ifndef WIN32
+/*
+ * On Unix, the signal handler cannot call PQcancelBlocking() directly because
+ * it is not async-signal-safe.  Instead, we use a pipe to wake a dedicated
+ * cancel thread: the signal handler writes a byte to the pipe, and the cancel
+ * thread's blocking read() returns, triggering the actual cancel requests.
+ */
+static int	cancel_pipe[2] = {-1, -1};
+static pthread_t cancel_thread;
 #endif
 
 /*
@@ -209,6 +226,7 @@ static void WaitForTerminatingWorkers(ParallelState *pstate);
 static void set_cancel_handler(void);
 static void set_cancel_pstate(ParallelState *pstate);
 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
+static void StopWorkers(void);
 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
 static int	GetIdleWorker(ParallelState *pstate);
 static bool HasEveryWorkerTerminated(ParallelState *pstate);
@@ -424,32 +442,9 @@ ShutdownWorkersHard(ParallelState *pstate)
 	/*
 	 * Force early termination of any commands currently in progress.
 	 */
-#ifndef WIN32
-	/* On non-Windows, send SIGTERM to each worker process. */
-	for (i = 0; i < pstate->numWorkers; i++)
-	{
-		pid_t		pid = pstate->parallelSlot[i].pid;
-
-		if (pid != 0)
-			kill(pid, SIGTERM);
-	}
-#else
-
-	/*
-	 * On Windows, send query cancels directly to the workers' backends.  Use
-	 * a critical section to ensure worker threads don't change state.
-	 */
-	EnterCriticalSection(&signal_info_lock);
-	for (i = 0; i < pstate->numWorkers; i++)
-	{
-		ArchiveHandle *AH = pstate->parallelSlot[i].AH;
-		char		errbuf[1];
-
-		if (AH != NULL && AH->connCancel != NULL)
-			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
-	}
-	LeaveCriticalSection(&signal_info_lock);
-#endif
+	pthread_mutex_lock(&signal_info_lock);
+	StopWorkers();
+	pthread_mutex_unlock(&signal_info_lock);
 
 	/* Now wait for them to terminate. */
 	WaitForTerminatingWorkers(pstate);
@@ -533,74 +528,54 @@ WaitForTerminatingWorkers(ParallelState *pstate)
  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
  * for a long time.  Instead, we try to send a cancel request and then die.
  * pg_dump probably doesn't really need this, but we might as well use it
- * there too.  Note that sending the cancel directly from the signal handler
- * is safe because PQcancel() is written to make it so.
+ * there too.
  *
- * In parallel operation on Unix, each process is responsible for canceling
- * its own connection (this must be so because nobody else has access to it).
- * Furthermore, the leader process should attempt to forward its signal to
- * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
- * needed because typing control-C at the console would deliver SIGINT to
- * every member of the terminal process group --- but in other scenarios it
- * might be that only the leader gets signaled.
+ * On Unix, the signal handler wakes up a dedicated cancel thread via a
+ * self-pipe, which then sends the cancel and calls _exit().  This thread also
+ * forwards the signal to each child so they can also cancel their queries. In
+ * simple manual use of pg_dump/pg_restore, forwarding isn't needed because
+ * typing control-C at the console would deliver SIGINT to every member of the
+ * terminal process group --- but in other scenarios it might be that only the
+ * leader gets signaled.
  *
  * On Windows, the cancel handler runs in a separate thread, because that's
  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
  * cancels on all active connections, and then return FALSE, which will allow
  * the process to die.  For safety's sake, we use a critical section to
- * protect the PGcancel structures against being changed while the signal
+ * protect the PGcancelConn structures against being changed while the signal
  * thread runs.
  */
 
-#ifndef WIN32
-
 /*
- * Signal handler (Unix only)
+ * Cancel all active queries and print termination message.
  */
 static void
-sigTermHandler(SIGNAL_ARGS)
+CancelBackends(void)
 {
-	int			i;
-	char		errbuf[1];
+	pthread_mutex_lock(&signal_info_lock);
 
-	/*
-	 * Some platforms allow delivery of new signals to interrupt an active
-	 * signal handler.  That could muck up our attempt to send PQcancel, so
-	 * disable the signals that set_cancel_handler enabled.
-	 */
-	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SIG_IGN);
-	pqsignal(SIGQUIT, SIG_IGN);
+	signal_info.cancel_requested = true;
 
 	/*
-	 * If we're in the leader, forward signal to all workers.  (It seems best
-	 * to do this before PQcancel; killing the leader transaction will result
-	 * in invalid-snapshot errors from active workers, which maybe we can
-	 * quiet by killing workers first.)  Ignore any errors.
+	 * Stop workers first to avoid invalid-snapshot errors if the leader
+	 * cancels before workers.
 	 */
-	if (signal_info.pstate != NULL)
-	{
-		for (i = 0; i < signal_info.pstate->numWorkers; i++)
-		{
-			pid_t		pid = signal_info.pstate->parallelSlot[i].pid;
+	StopWorkers();
 
-			if (pid != 0)
-				kill(pid, SIGTERM);
-		}
-	}
+	if (signal_info.myAH != NULL && signal_info.myAH->cancelConn != NULL)
+		(void) PQcancelBlocking(signal_info.myAH->cancelConn);
 
-	/*
-	 * Send QueryCancel if we have a connection to send to.  Ignore errors,
-	 * there's not much we can do about them anyway.
-	 */
-	if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
-		(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
+	pthread_mutex_unlock(&signal_info_lock);
 
 	/*
-	 * Report we're quitting, using nothing more complicated than write(2).
-	 * When in parallel operation, only the leader process should do this.
+	 * Print termination message. In parallel operation, only the leader
+	 * should print this. On Windows, workers are threads in the same process
+	 * and the console handler only runs in the leader context, so we can
+	 * always print it.
 	 */
+#ifndef WIN32
 	if (!signal_info.am_worker)
+#endif
 	{
 		if (progname)
 		{
@@ -609,172 +584,204 @@ sigTermHandler(SIGNAL_ARGS)
 		}
 		write_stderr("terminated by user\n");
 	}
-
-	/*
-	 * And die, using _exit() not exit() because the latter will invoke atexit
-	 * handlers that can fail if we interrupted related code.
-	 */
-	_exit(1);
 }
 
 /*
- * Enable cancel interrupt handler, if not already done.
+ * Stop all worker processes/threads.
+ *
+ * On Unix, send SIGTERM to each worker process; their signal handlers will
+ * send cancel requests to their backends.
+ *
+ * On Windows, workers are threads in the same process, so we send cancel
+ * requests directly to their backends.
+ *
+ * Caller must hold signal_info_lock.
  */
 static void
-set_cancel_handler(void)
+StopWorkers(void)
 {
-	/*
-	 * When forking, signal_info.handler_set will propagate into the new
-	 * process, but that's fine because the signal handler state does too.
-	 */
-	if (!signal_info.handler_set)
+	int			i;
+
+	if (signal_info.pstate == NULL)
+		return;
+
+	for (i = 0; i < signal_info.pstate->numWorkers; i++)
 	{
-		signal_info.handler_set = true;
+#ifndef WIN32
+		pid_t		pid = signal_info.pstate->parallelSlot[i].pid;
+
+		if (pid != 0)
+			kill(pid, SIGTERM);
+#else
+		ArchiveHandle *AH = signal_info.pstate->parallelSlot[i].AH;
 
-		pqsignal(SIGINT, sigTermHandler);
-		pqsignal(SIGTERM, sigTermHandler);
-		pqsignal(SIGQUIT, sigTermHandler);
+		if (AH != NULL && AH->cancelConn != NULL)
+			(void) PQcancelBlocking(AH->cancelConn);
+#endif
 	}
 }
 
-#else							/* WIN32 */
+#ifdef WIN32
 
 /*
  * Console interrupt handler --- runs in a newly-started thread.
  *
- * After stopping other threads and sending cancel requests on all open
- * connections, we return FALSE which will allow the default ExitProcess()
- * action to be taken.
+ * Send cancel requests on all open connections and return FALSE to allow
+ * the default ExitProcess() action to terminate the process.
  */
 static BOOL WINAPI
 consoleHandler(DWORD dwCtrlType)
 {
-	int			i;
-	char		errbuf[1];
-
 	if (dwCtrlType == CTRL_C_EVENT ||
 		dwCtrlType == CTRL_BREAK_EVENT)
 	{
-		/* Critical section prevents changing data we look at here */
-		EnterCriticalSection(&signal_info_lock);
+		CancelBackends();
+	}
 
-		/*
-		 * If in parallel mode, stop worker threads and send QueryCancel to
-		 * their connected backends.  The main point of stopping the worker
-		 * threads is to keep them from reporting the query cancels as errors,
-		 * which would clutter the user's screen.  We needn't stop the leader
-		 * thread since it won't be doing much anyway.  Do this before
-		 * canceling the main transaction, else we might get invalid-snapshot
-		 * errors reported before we can stop the workers.  Ignore errors,
-		 * there's not much we can do about them anyway.
-		 */
-		if (signal_info.pstate != NULL)
+	/* Always return FALSE to allow signal handling to continue */
+	return FALSE;
+}
+
+#else							/* !WIN32 */
+
+/*
+ * Cancel thread main function. Waits for the signal handler to write to the
+ * pipe, then cancels backends and calls _exit().
+ */
+static void *
+cancel_thread_main(void *arg)
+{
+	for (;;)
+	{
+		char		buf[16];
+		ssize_t		rc;
+
+		/* Wait for signal handler to wake us up */
+		rc = read(cancel_pipe[0], buf, sizeof(buf));
+		if (rc <= 0)
 		{
-			for (i = 0; i < signal_info.pstate->numWorkers; i++)
-			{
-				ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
-				ArchiveHandle *AH = slot->AH;
-				HANDLE		hThread = (HANDLE) slot->hThread;
-
-				/*
-				 * Using TerminateThread here may leave some resources leaked,
-				 * but it doesn't matter since we're about to end the whole
-				 * process.
-				 */
-				if (hThread != INVALID_HANDLE_VALUE)
-					TerminateThread(hThread, 0);
-
-				if (AH != NULL && AH->connCancel != NULL)
-					(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
-			}
+			if (errno == EINTR)
+				continue;
+			/* Pipe closed or error - exit thread */
+			break;
 		}
 
-		/*
-		 * Send QueryCancel to leader connection, if enabled.  Ignore errors,
-		 * there's not much we can do about them anyway.
-		 */
-		if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
-			(void) PQcancel(signal_info.myAH->connCancel,
-							errbuf, sizeof(errbuf));
-
-		LeaveCriticalSection(&signal_info_lock);
+		CancelBackends();
 
 		/*
-		 * Report we're quitting, using nothing more complicated than
-		 * write(2).  (We might be able to get away with using pg_log_*()
-		 * here, but since we terminated other threads uncleanly above, it
-		 * seems better to assume as little as possible.)
+		 * And die, using _exit() not exit() because the latter will invoke
+		 * atexit handlers that can fail if we interrupted related code.
 		 */
-		if (progname)
-		{
-			write_stderr(progname);
-			write_stderr(": ");
-		}
-		write_stderr("terminated by user\n");
+		_exit(1);
 	}
 
-	/* Always return FALSE to allow signal handling to continue */
-	return FALSE;
+	return NULL;
 }
 
+/*
+ * Signal handler (Unix only).  Wakes up the cancel thread by writing to the
+ * pipe.
+ */
+static void
+sigTermHandler(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+	char		c = 1;
+
+	/* Wake up the cancel thread - write() is async-signal-safe */
+	if (cancel_pipe[1] >= 0)
+		(void) write(cancel_pipe[1], &c, 1);
+
+	errno = save_errno;
+}
+
+#endif							/* WIN32 */
+
 /*
  * Enable cancel interrupt handler, if not already done.
  */
 static void
 set_cancel_handler(void)
 {
-	if (!signal_info.handler_set)
+	if (signal_info.handler_set)
+		return;
+
+	signal_info.handler_set = true;
+
+	pthread_mutex_init(&signal_info_lock, NULL);
+
+#ifdef WIN32
+	SetConsoleCtrlHandler(consoleHandler, TRUE);
+#else
+
+	/*
+	 * Create the pipe and cancel thread (see comment on cancel_pipe above).
+	 */
+	if (pipe(cancel_pipe) < 0)
 	{
-		signal_info.handler_set = true;
+		pg_log_error("could not create pipe for cancel: %m");
+		exit(1);
+	}
 
-		InitializeCriticalSection(&signal_info_lock);
+	/*
+	 * Make the write end non-blocking, so that the signal handler won't block
+	 * if the pipe buffer is full (which is very unlikely in practice but
+	 * possible in theory).
+	 */
+	fcntl(cancel_pipe[1], F_SETFL, O_NONBLOCK);
 
-		SetConsoleCtrlHandler(consoleHandler, TRUE);
+	{
+		int			rc = pthread_create(&cancel_thread, NULL, cancel_thread_main, NULL);
+
+		if (rc != 0)
+		{
+			pg_log_error("could not create cancel thread: %s", strerror(rc));
+			exit(1);
+		}
 	}
-}
 
-#endif							/* WIN32 */
+	pqsignal(SIGINT, sigTermHandler);
+	pqsignal(SIGTERM, sigTermHandler);
+	pqsignal(SIGQUIT, sigTermHandler);
+#endif
+}
 
 
 /*
  * set_archive_cancel_info
  *
- * Fill AH->connCancel with cancellation info for the specified database
+ * Fill AH->cancelConn with cancellation info for the specified database
  * connection; or clear it if conn is NULL.
  */
 void
 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
 {
-	PGcancel   *oldConnCancel;
+	PGcancelConn *oldCancelConn;
 
 	/*
-	 * Activate the interrupt handler if we didn't yet in this process.  On
-	 * Windows, this also initializes signal_info_lock; therefore it's
-	 * important that this happen at least once before we fork off any
-	 * threads.
+	 * Activate the interrupt handler if we didn't yet in this process.  This
+	 * also initializes signal_info_lock; therefore it's important that this
+	 * happen at least once before we fork off any threads.
 	 */
 	set_cancel_handler();
 
 	/*
-	 * On Unix, we assume that storing a pointer value is atomic with respect
-	 * to any possible signal interrupt.  On Windows, use a critical section.
+	 * Use mutex to prevent the cancel handler from using the pointer while
+	 * we're changing it.
 	 */
-
-#ifdef WIN32
-	EnterCriticalSection(&signal_info_lock);
-#endif
+	pthread_mutex_lock(&signal_info_lock);
 
 	/* Free the old one if we have one */
-	oldConnCancel = AH->connCancel;
+	oldCancelConn = AH->cancelConn;
 	/* be sure interrupt handler doesn't use pointer while freeing */
-	AH->connCancel = NULL;
+	AH->cancelConn = NULL;
 
-	if (oldConnCancel != NULL)
-		PQfreeCancel(oldConnCancel);
+	if (oldCancelConn != NULL)
+		PQcancelFinish(oldCancelConn);
 
 	/* Set the new one if specified */
 	if (conn)
-		AH->connCancel = PQgetCancel(conn);
+		AH->cancelConn = PQcancelCreate(conn);
 
 	/*
 	 * On Unix, there's only ever one active ArchiveHandle per process, so we
@@ -790,49 +797,35 @@ set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
 		signal_info.myAH = AH;
 #endif
 
-#ifdef WIN32
-	LeaveCriticalSection(&signal_info_lock);
-#endif
+	pthread_mutex_unlock(&signal_info_lock);
 }
 
 /*
  * set_cancel_pstate
  *
  * Set signal_info.pstate to point to the specified ParallelState, if any.
- * We need this mainly to have an interlock against Windows signal thread.
+ * We need this mainly to have an interlock against the cancel handler thread.
  */
 static void
 set_cancel_pstate(ParallelState *pstate)
 {
-#ifdef WIN32
-	EnterCriticalSection(&signal_info_lock);
-#endif
-
+	pthread_mutex_lock(&signal_info_lock);
 	signal_info.pstate = pstate;
-
-#ifdef WIN32
-	LeaveCriticalSection(&signal_info_lock);
-#endif
+	pthread_mutex_unlock(&signal_info_lock);
 }
 
 /*
  * set_cancel_slot_archive
  *
  * Set ParallelSlot's AH field to point to the specified archive, if any.
- * We need this mainly to have an interlock against Windows signal thread.
+ * We need this mainly to have an interlock against the cancel handler thread.
  */
 static void
 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
 {
-#ifdef WIN32
-	EnterCriticalSection(&signal_info_lock);
-#endif
-
+	pthread_mutex_lock(&signal_info_lock);
 	slot->AH = AH;
-
-#ifdef WIN32
-	LeaveCriticalSection(&signal_info_lock);
-#endif
+	pthread_mutex_unlock(&signal_info_lock);
 }
 
 
@@ -947,7 +940,7 @@ ParallelBackupStart(ArchiveHandle *AH)
 
 	/*
 	 * Temporarily disable query cancellation on the leader connection.  This
-	 * ensures that child processes won't inherit valid AH->connCancel
+	 * ensures that child processes won't inherit valid AH->cancelConn
 	 * settings and thus won't try to issue cancels against the leader's
 	 * connection.  No harm is done if we fail while it's disabled, because
 	 * the leader connection is idle at this point anyway.
@@ -1005,6 +998,17 @@ ParallelBackupStart(ArchiveHandle *AH)
 			/* instruct signal handler that we're in a worker now */
 			signal_info.am_worker = true;
 
+			/*
+			 * Reset cancel handler state so that the worker will set up its
+			 * own cancel thread when it calls set_archive_cancel_info().
+			 * Threads don't survive fork(), so we can't use the leader's.
+			 * Also close the inherited pipe fds.
+			 */
+			signal_info.handler_set = false;
+			close(cancel_pipe[0]);
+			close(cancel_pipe[1]);
+			cancel_pipe[0] = cancel_pipe[1] = -1;
+
 			/* close read end of Worker -> Leader */
 			closesocket(pipeWM[PIPE_READ]);
 			/* close write end of Leader -> Worker */
@@ -1421,8 +1425,18 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
 
 	if (!msg)
 	{
-		/* If do_wait is true, we must have detected EOF on some socket */
-		if (do_wait)
+		/*
+		 * If do_wait is true, we must have detected EOF on some socket. If
+		 * it's due to a cancel request, that's expected, otherwise it's a
+		 * problem.
+		 */
+		bool		cancel_requested;
+
+		pthread_mutex_lock(&signal_info_lock);
+		cancel_requested = signal_info.cancel_requested;
+		pthread_mutex_unlock(&signal_info_lock);
+
+		if (do_wait && !cancel_requested)
 			pg_fatal("a worker process died unexpectedly");
 		return false;
 	}
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index df8a69d3b79..ae037e70d55 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -5208,7 +5208,7 @@ CloneArchive(ArchiveHandle *AH)
 
 	/* The clone will have its own connection, so disregard connection state */
 	clone->connection = NULL;
-	clone->connCancel = NULL;
+	clone->cancelConn = NULL;
 	clone->currUser = NULL;
 	clone->currSchema = NULL;
 	clone->currTableAm = NULL;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 365073b3eae..54e4099be53 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -288,8 +288,12 @@ struct _archiveHandle
 	char	   *savedPassword;	/* password for ropt->username, if known */
 	char	   *use_role;
 	PGconn	   *connection;
-	/* If connCancel isn't NULL, SIGINT handler will send a cancel */
-	PGcancel   *volatile connCancel;
+
+	/*
+	 * If cancelConn isn't NULL, SIGINT handler will trigger the cancel thread
+	 * to send a cancel.
+	 */
+	PGcancelConn *cancelConn;
 
 	int			connectToDB;	/* Flag to indicate if direct DB connection is
 								 * required */
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index 5c349279beb..0cc29a8aa70 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -84,7 +84,7 @@ ReconnectToServer(ArchiveHandle *AH, const char *dbname)
 
 	/*
 	 * Note: we want to establish the new connection, and in particular update
-	 * ArchiveHandle's connCancel, before closing old connection.  Otherwise
+	 * ArchiveHandle's cancelConn, before closing old connection.  Otherwise
 	 * an ill-timed SIGINT could try to access a dead connection.
 	 */
 	AH->connection = NULL;		/* dodge error check in ConnectDatabaseAhx */
@@ -164,12 +164,11 @@ void
 DisconnectDatabase(Archive *AHX)
 {
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
-	char		errbuf[1];
 
 	if (!AH->connection)
 		return;
 
-	if (AH->connCancel)
+	if (AH->cancelConn)
 	{
 		/*
 		 * If we have an active query, send a cancel before closing, ignoring
@@ -177,7 +176,7 @@ DisconnectDatabase(Archive *AHX)
 		 * helpful during pg_fatal().
 		 */
 		if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
-			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
+			(void) PQcancelBlocking(AH->cancelConn);
 
 		/*
 		 * Prevent signal handler from sending a cancel after this.
-- 
2.53.0

From 3000ad8e1afcca05479ed2d5903214b96dc17227 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sat, 7 Mar 2026 00:00:38 +0100
Subject: [PATCH v5 4/5] fixup! Don't use deprecated and insecure PQcancel psql
 and other tools anymore

Keep track of generation that should be cancelled.
---
 src/fe_utils/cancel.c | 46 ++++++++++++++++++++++++++++++++++---------
 1 file changed, 37 insertions(+), 9 deletions(-)

diff --git a/src/fe_utils/cancel.c b/src/fe_utils/cancel.c
index 5d645c554ab..caaf3e7c675 100644
--- a/src/fe_utils/cancel.c
+++ b/src/fe_utils/cancel.c
@@ -46,6 +46,7 @@
 #include "postgres_fe.h"
 
 #include <signal.h>
+#include <stdatomic.h>
 #include <unistd.h>
 
 #ifndef WIN32
@@ -85,9 +86,22 @@ static PGcancelConn *cancelConn = NULL;
 
 /*
  * Generation counter for cancelConn. Incremented each time cancelConn is
- * changed. Used to detect if cancelConn was replaced while we were using it.
+ * changed (under cancelConnLock). Read by the signal handler (which can run
+ * on any thread) and by the cancel thread, so it needs to be atomic.
  */
-static uint64 cancelConnGeneration = 0;
+static _Atomic sig_atomic_t cancelConnGeneration = 0;
+
+/*
+ * The generation that was current when SIGINT was received.  The cancel
+ * thread compares this against cancelConnGeneration before sending a cancel
+ * request, so that a stale signal (from a query that already finished) does
+ * not cancel a subsequent query.
+ *
+ * Written by the signal handler (or Windows console handler) and read by
+ * the cancel thread, so it must be an atomic type for cross-thread safety.
+ * sig_atomic_t is guaranteed lock-free and thus safe in signal handlers.
+ */
+static _Atomic sig_atomic_t cancelConnGenerationDuringSignal = 0;
 
 /*
  * Mutex protecting cancelConn and cancelConnGeneration.
@@ -151,22 +165,29 @@ static void
 SendCancelRequest(void)
 {
 	PGcancelConn *cc;
-	uint64		generation;
+	sig_atomic_t generation;
 	bool		putConnectionBack = false;
 
 	/*
 	 * Borrow the cancel connection from the global, setting it to NULL so
 	 * that SetCancelConn/ResetCancelConn won't free it while we're using it.
+	 *
+	 * Also check that the generation matches what the signal handler
+	 * captured. If it doesn't, the main thread already moved on to a
+	 * different query (or no query at all), so sending a cancel would be
+	 * wrong.
 	 */
 	pthread_mutex_lock(&cancelConnLock);
 	cc = cancelConn;
-	generation = cancelConnGeneration;
+	generation = atomic_load(&cancelConnGeneration);
+	if (cc == NULL || generation != atomic_load(&cancelConnGenerationDuringSignal))
+	{
+		pthread_mutex_unlock(&cancelConnLock);
+		return;
+	}
 	cancelConn = NULL;
 	pthread_mutex_unlock(&cancelConnLock);
 
-	if (cc == NULL)
-		return;
-
 	write_stderr(cancel_sent_msg);
 
 	if (!PQcancelBlocking(cc))
@@ -185,7 +206,7 @@ SendCancelRequest(void)
 	 * using it.
 	 */
 	pthread_mutex_lock(&cancelConnLock);
-	if (cancelConnGeneration == generation)
+	if (atomic_load(&cancelConnGeneration) == generation)
 	{
 		/* Generation unchanged, put it back for reuse */
 		cancelConn = cc;
@@ -210,7 +231,7 @@ SetCancelConnInternal(PGcancelConn *newCancelConn)
 	pthread_mutex_lock(&cancelConnLock);
 	oldCancelConn = cancelConn;
 	cancelConn = newCancelConn;
-	cancelConnGeneration++;
+	atomic_fetch_add(&cancelConnGeneration, 1);
 	pthread_mutex_unlock(&cancelConnLock);
 
 	if (oldCancelConn != NULL)
@@ -256,6 +277,7 @@ consoleHandler(DWORD dwCtrlType)
 		dwCtrlType == CTRL_BREAK_EVENT)
 	{
 		CancelRequested = true;
+		atomic_store(&cancelConnGenerationDuringSignal, atomic_load(&cancelConnGeneration));
 
 		if (cancel_callback != NULL)
 			cancel_callback();
@@ -311,6 +333,12 @@ handle_sigint(SIGNAL_ARGS)
 
 	CancelRequested = true;
 
+	/*
+	 * Capture the current generation so the cancel thread can verify it's
+	 * still sending the cancel for the right query.
+	 */
+	atomic_store(&cancelConnGenerationDuringSignal, atomic_load(&cancelConnGeneration));
+
 	if (cancel_callback != NULL)
 		cancel_callback();
 
-- 
2.53.0

From a290e1635b8b0a5d4c60b0bdc280684b0b394cd6 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <[email protected]>
Date: Sat, 7 Mar 2026 00:05:05 +0100
Subject: [PATCH v5 5/5] fixup! Don't use deprecated and insecure PQcancel psql
 and other tools anymore

Drain the pipe in the cancel thread.
---
 src/fe_utils/cancel.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/src/fe_utils/cancel.c b/src/fe_utils/cancel.c
index caaf3e7c675..a5607b204cc 100644
--- a/src/fe_utils/cancel.c
+++ b/src/fe_utils/cancel.c
@@ -305,7 +305,7 @@ cancel_thread_main(void *arg)
 		char		buf[16];
 		ssize_t		rc;
 
-		/* Wait for signal handler to wake us up */
+		/* Wait for signal handler to wake us up (blocking read) */
 		rc = read(cancel_pipe[0], buf, sizeof(buf));
 		if (rc <= 0)
 		{
@@ -315,6 +315,14 @@ cancel_thread_main(void *arg)
 			break;
 		}
 
+		/* Drain the pipe so multiple SIGINTs don't queue up extra wakeups */
+		fcntl(cancel_pipe[0], F_SETFL, O_NONBLOCK);
+		while (read(cancel_pipe[0], buf, sizeof(buf)) > 0)
+		{
+			/* loop until pipe is drained */
+		}
+		fcntl(cancel_pipe[0], F_SETFL, 0);
+
 		SendCancelRequest();
 	}
 
-- 
2.53.0

Reply via email to