On Thu Mar 5, 2026 at 7:30 PM CET, Heikki Linnakangas wrote:
It took me a while to get the big picture of how this works. cancel.c could use some high-level comments explaining how to use the facility; it's a real mixed bag right now.
Attached is a version with a bunch more comments. I agree this cancel logic is hard to understand without them. It took me quite a while to understand it myself. (I don't think the code got any harder to understand with these changes though, the exact same complexity was already there for Windows. But I agree more commends are good.)
The cancelConn mechanism is a global variable, which means that it can only be used with one connection in the process. That's OK with the current callers, but seems short-sighted. What if we wanted to use it for pgbench, for example, which uses multiple threads and connections? Or if we changed pg_dump to use multiple threads, like you also suggested as a possible follow-up.
Allowing for multiple callers seems like scope-creep to me, and also hard to do in a generic way. I'd say IF we want that in a generic way I'd first want to see a version of that that solves the problem for pgdench/pg_dump, before generalizing it to cancel.c.
This is racy, if the cancellation thread doesn't immediately process the wakeup. For example, because it's still busy processing a previous wakeup, because there's a network hiccup or something. By the time the cancellation thread runs, the main thread might already be running a different query than it was when the user hit CTRL-C.
I now noted this in one of the new comments. I don't think there's a way around this race condition entirely. It's simply a limitation of our cancel protocol (because it's impossible to specify which query on a connection should be cancelled). In theory we could reduce the window for the race, by having all frontend tools use async connections and have the main thread wait for either the self-pipe or a cancel. That way it would be more similar to the previous signal code in behaviour. That's a much bigger lift though, i.e. all PQexec and PQgetResult calls would need to be modified. My proposed change doesn't require changing the callsites at all. In interactive usage of psql it seems pretty unlikely that people will hit this race condition. In non-interactive use, it doesn't matter because you just want Ctrl-C to cancel the application (whichever query it currently runs). So I'd say it's currently not worth the complexity to do the less racy option.
From 6d436ade8ad7f6fb77502669e14b71dda4618b1f Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio <[email protected]> Date: Sat, 13 Dec 2025 18:18:13 +0100 Subject: [PATCH v4 1/3] Move Windows pthread compatibility functions to src/port This is in preparation of a follow-up commit which will start to use these functions in more places than just libpq. --- configure.ac | 1 + src/interfaces/libpq/Makefile | 1 - src/interfaces/libpq/meson.build | 2 +- src/port/meson.build | 1 + src/{interfaces/libpq => port}/pthread-win32.c | 4 ++-- 5 files changed, 5 insertions(+), 4 deletions(-) rename src/{interfaces/libpq => port}/pthread-win32.c (94%) diff --git a/configure.ac b/configure.ac index f4e3bd307c8..9284193771a 100644 --- a/configure.ac +++ b/configure.ac @@ -1949,6 +1949,7 @@ if test "$PORTNAME" = "win32"; then AC_LIBOBJ(dirmod) AC_LIBOBJ(kill) AC_LIBOBJ(open) + AC_LIBOBJ(pthread-win32) AC_LIBOBJ(system) AC_LIBOBJ(win32common) AC_LIBOBJ(win32dlopen) diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index 0963995eed4..d6cfb00d655 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -75,7 +75,6 @@ endif ifeq ($(PORTNAME), win32) OBJS += \ - pthread-win32.o \ win32.o endif diff --git a/src/interfaces/libpq/meson.build b/src/interfaces/libpq/meson.build index b0ae72167a1..b949780b85b 100644 --- a/src/interfaces/libpq/meson.build +++ b/src/interfaces/libpq/meson.build @@ -20,7 +20,7 @@ libpq_sources = files( libpq_so_sources = [] # for shared lib, in addition to the above if host_system == 'windows' - libpq_sources += files('pthread-win32.c', 'win32.c') + libpq_sources += files('win32.c') libpq_so_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ '--NAME', 'libpq', '--FILEDESC', 'PostgreSQL Access Library',]) diff --git a/src/port/meson.build b/src/port/meson.build index 7296f8e3c03..a0fc13a5e62 100644 --- a/src/port/meson.build +++ b/src/port/meson.build @@ -32,6 +32,7 @@ if host_system == 'windows' 'dirmod.c', 'kill.c', 'open.c', + 'pthread-win32.c', 'system.c', 'win32common.c', 'win32dlopen.c', diff --git a/src/interfaces/libpq/pthread-win32.c b/src/port/pthread-win32.c similarity index 94% rename from src/interfaces/libpq/pthread-win32.c rename to src/port/pthread-win32.c index cf66284f007..48d68b693a7 100644 --- a/src/interfaces/libpq/pthread-win32.c +++ b/src/port/pthread-win32.c @@ -5,12 +5,12 @@ * * Copyright (c) 2004-2026, PostgreSQL Global Development Group * IDENTIFICATION -* src/interfaces/libpq/pthread-win32.c +* src/port/pthread-win32.c * *------------------------------------------------------------------------- */ -#include "postgres_fe.h" +#include "c.h" #include "pthread-win32.h" base-commit: f95d73ed433207c4323802dc96e52f3e5553a86c -- 2.53.0
From b8cf36fd5c9cfd2b3f022810565491e1780421c2 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio <[email protected]> Date: Sat, 13 Dec 2025 13:05:50 +0100 Subject: [PATCH v4 2/3] Don't use deprecated and insecure PQcancel psql and other tools anymore All of our frontend tools that used our fe_utils to cancel queries, including psql, still used PQcancel to send cancel requests to the server. That function is insecure, because it does not use encryption to send the cancel request. This starts using the new cancellation APIs (introduced in 61461a300) for all these frontend tools. These APIs use the same encryption settings as the connection that's being cancelled. Since these APIs are not signal-safe this required a refactor to not send the cancel request in a signal handler anymore, but instead using a dedicated thread. Similar logic was already used for Windows anyway, so this also has the benefit that it makes the cancel logic more uniform across our supported platforms. The calls to PQcancel in pg_dump are still kept and will be removed in a later commit. The reason for that is that that code does not use the helpers from fe_utils to cancel queries, and instead implements its own logic. --- meson.build | 2 +- src/fe_utils/Makefile | 2 +- src/fe_utils/cancel.c | 368 +++++++++++++++++++++++----------- src/include/fe_utils/cancel.h | 4 - 4 files changed, 252 insertions(+), 124 deletions(-) diff --git a/meson.build b/meson.build index ddf5172982f..e72d91500bb 100644 --- a/meson.build +++ b/meson.build @@ -3482,7 +3482,7 @@ frontend_code = declare_dependency( include_directories: [postgres_inc], link_with: [fe_utils, common_static, pgport_static], sources: generated_headers_stamp, - dependencies: [os_deps, libintl], + dependencies: [os_deps, libintl, thread_dep], ) backend_both_deps += [ diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile index cbfbf93ac69..809ab21cc0c 100644 --- a/src/fe_utils/Makefile +++ b/src/fe_utils/Makefile @@ -17,7 +17,7 @@ subdir = src/fe_utils top_builddir = ../.. include $(top_builddir)/src/Makefile.global -override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS) +override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) -I$(top_srcdir)/src/port $(CPPFLAGS) OBJS = \ archive.o \ diff --git a/src/fe_utils/cancel.c b/src/fe_utils/cancel.c index e6b75439f56..5d645c554ab 100644 --- a/src/fe_utils/cancel.c +++ b/src/fe_utils/cancel.c @@ -2,9 +2,38 @@ * * Query cancellation support for frontend code * - * Assorted utility functions to control query cancellation with signal - * handler for SIGINT. + * This module provides SIGINT/Ctrl-C handling for frontend tools that need + * to cancel queries running on the server. It combines three completely + * independent mechanisms, any combination of which can be used by a caller: * + * 1. Server cancel request -- Often what applications need. When a query is + * running, and the main thread is waiting for the result of that query in a + * blocking manner, we want SIGINT/Ctrl-C to cancel that query. This can be + * done by having the application call SetCancelConn() to register the + * connection that is running the query, prior to waiting for the result. + * When SIGINT/Ctrl-C is received a cancel request for this connection will + * then be sent to the server from a separate thread. That in turn will then + * (assuming a co-operating server) cause the server to cancel the query and + * send an error to the waiting client on the main thread. The cancel + * connection is a process-wide global, so only one connection can be the + * cancel target at a time. ResetCancelConn() can be used to unregister the + * connection again, preventing sending a cancel request if SIGINT/Ctrl-C is + * received after blocking wait has already completed. + * + * 2. CancelRequested flag -- A more involved but also much more flexible way + * of cancelling. A volatile sig_atomic_t CancelRequested flag is set to + * true whenever SIGINT is received. This means that the application code + * can fully control what it does with this flag. The primary usecase for + * this is when the application code is not blocked (indefinitely), but + * needs to take an action when Ctrl-C is pressed, such as break out of a + * long running loop. + * + * 3. Cancel callback -- The most complex way of handling a sigint. An optional + * function pointer registered via setup_cancel_handler(). If set, it is + * called directly from the signal handler, so it must be async-signal-safe. + * Writing async-signal-safe code is not easy, so this is only recommended + * as a last resort. psql uses this to longjmp back to the main loop when no + * query is active. * * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -16,9 +45,21 @@ #include "postgres_fe.h" +#include <signal.h> #include <unistd.h> +#ifndef WIN32 +#include <fcntl.h> +#endif + +#ifdef WIN32 +#include "pthread-win32.h" +#else +#include <pthread.h> +#endif + #include "common/connect.h" +#include "common/logging.h" #include "fe_utils/cancel.h" #include "fe_utils/string_utils.h" @@ -36,11 +77,22 @@ (void) rc_; \ } while (0) + /* - * Contains all the information needed to cancel a query issued from - * a database connection to the backend. + * Cancel connection that should be used to send cancel requests. */ -static PGcancel *volatile cancelConn = NULL; +static PGcancelConn *cancelConn = NULL; + +/* + * Generation counter for cancelConn. Incremented each time cancelConn is + * changed. Used to detect if cancelConn was replaced while we were using it. + */ +static uint64 cancelConnGeneration = 0; + +/* + * Mutex protecting cancelConn and cancelConnGeneration. + */ +static pthread_mutex_t cancelConnLock = PTHREAD_MUTEX_INITIALIZER; /* * Predetermined localized error strings --- needed to avoid trying @@ -58,186 +110,266 @@ static const char *cancel_not_sent_msg = NULL; */ volatile sig_atomic_t CancelRequested = false; -#ifdef WIN32 -static CRITICAL_SECTION cancelConnLock; -#endif - /* * Additional callback for cancellations. */ static void (*cancel_callback) (void) = NULL; +#ifndef WIN32 +/* + * On Unix, the SIGINT signal handler cannot call PQcancelBlocking() directly + * because it is not async-signal-safe. Instead, we use a pipe to wake a + * dedicated cancel thread: the signal handler writes a byte to the pipe, and + * the cancel thread's blocking read() returns, triggering the actual cancel + * request. + */ +static int cancel_pipe[2] = {-1, -1}; +static pthread_t cancel_thread; +#endif + /* - * SetCancelConn + * Send a cancel request to the connection, if one is set. + * + * Called from the cancel thread (Unix) or the console handler thread + * (Windows), never from the signal handler itself. * - * Set cancelConn to point to the current database connection. + * To avoid the cancel connection being freed by a concurrent + * SetCancelConn()/ResetCancelConn() call on the main thread while we are + * using it, we temporarily take it out of the global variable while sending + * the request. A generation counter lets us detect whether the main thread + * replaced it in the meantime, in which case we free the old one instead of + * putting it back. + * + * Note: there is an inherent race where, if this thread is slow to process + * the wakeup (e.g. due to a network delay sending a previous cancel), the + * main thread may have already moved on to a different query by the time we + * send the cancel. This is unavoidable with the server's cancel protocol, + * which identifies the session but not individual queries. */ -void -SetCancelConn(PGconn *conn) +static void +SendCancelRequest(void) { - PGcancel *oldCancelConn; + PGcancelConn *cc; + uint64 generation; + bool putConnectionBack = false; + + /* + * Borrow the cancel connection from the global, setting it to NULL so + * that SetCancelConn/ResetCancelConn won't free it while we're using it. + */ + pthread_mutex_lock(&cancelConnLock); + cc = cancelConn; + generation = cancelConnGeneration; + cancelConn = NULL; + pthread_mutex_unlock(&cancelConnLock); -#ifdef WIN32 - EnterCriticalSection(&cancelConnLock); -#endif + if (cc == NULL) + return; - /* Free the old one if we have one */ - oldCancelConn = cancelConn; + write_stderr(cancel_sent_msg); - /* be sure handle_sigint doesn't use pointer while freeing */ - cancelConn = NULL; + if (!PQcancelBlocking(cc)) + { + char *errmsg = PQcancelErrorMessage(cc); - if (oldCancelConn != NULL) - PQfreeCancel(oldCancelConn); + write_stderr(cancel_not_sent_msg); + if (errmsg) + write_stderr(errmsg); + } + /* Reset for possible reuse */ + PQcancelReset(cc); + + /* + * Put the cancel connection back if it wasn't replaced while we were + * using it. + */ + pthread_mutex_lock(&cancelConnLock); + if (cancelConnGeneration == generation) + { + /* Generation unchanged, put it back for reuse */ + cancelConn = cc; + putConnectionBack = true; + } + pthread_mutex_unlock(&cancelConnLock); - cancelConn = PQgetCancel(conn); + /* If it was replaced, we free it, because we were the last user */ + if (!putConnectionBack) + PQcancelFinish(cc); +} -#ifdef WIN32 - LeaveCriticalSection(&cancelConnLock); -#endif + +/* + * Helper to replace cancelConn with a new value. + */ +static void +SetCancelConnInternal(PGcancelConn *newCancelConn) +{ + PGcancelConn *oldCancelConn; + + pthread_mutex_lock(&cancelConnLock); + oldCancelConn = cancelConn; + cancelConn = newCancelConn; + cancelConnGeneration++; + pthread_mutex_unlock(&cancelConnLock); + + if (oldCancelConn != NULL) + PQcancelFinish(oldCancelConn); +} + +/* + * SetCancelConn + * + * Set cancelConn to point to a cancel connection for the given database + * connection. This creates a new PGcancelConn that can be used to send + * cancel requests. + */ +void +SetCancelConn(PGconn *conn) +{ + SetCancelConnInternal(PQcancelCreate(conn)); } /* * ResetCancelConn * - * Free the current cancel connection, if any, and set to NULL. + * Clear cancelConn, preventing any pending cancel from being sent. */ void ResetCancelConn(void) { - PGcancel *oldCancelConn; + SetCancelConnInternal(NULL); +} -#ifdef WIN32 - EnterCriticalSection(&cancelConnLock); -#endif - oldCancelConn = cancelConn; +#ifdef WIN32 +/* + * Console control handler for Windows. + * + * This runs in a separate thread created by the OS, so we can safely call + * the blocking cancel API directly. + */ +static BOOL WINAPI +consoleHandler(DWORD dwCtrlType) +{ + if (dwCtrlType == CTRL_C_EVENT || + dwCtrlType == CTRL_BREAK_EVENT) + { + CancelRequested = true; - /* be sure handle_sigint doesn't use pointer while freeing */ - cancelConn = NULL; + if (cancel_callback != NULL) + cancel_callback(); - if (oldCancelConn != NULL) - PQfreeCancel(oldCancelConn); + SendCancelRequest(); -#ifdef WIN32 - LeaveCriticalSection(&cancelConnLock); -#endif + return TRUE; + } + else + /* Return FALSE for any signals not being handled */ + return FALSE; } +#else /* !WIN32 */ /* - * Code to support query cancellation - * - * Note that sending the cancel directly from the signal handler is safe - * because PQcancel() is written to make it so. We use write() to report - * to stderr because it's better to use simple facilities in a signal - * handler. - * - * On Windows, the signal canceling happens on a separate thread, because - * that's how SetConsoleCtrlHandler works. The PQcancel function is safe - * for this (unlike PQrequestCancel). However, a CRITICAL_SECTION is required - * to protect the PGcancel structure against being changed while the signal - * thread is using it. + * Cancel thread main function. Waits for the signal handler to write to the + * pipe, then sends a cancel request. */ +static void * +cancel_thread_main(void *arg) +{ + for (;;) + { + char buf[16]; + ssize_t rc; -#ifndef WIN32 + /* Wait for signal handler to wake us up */ + rc = read(cancel_pipe[0], buf, sizeof(buf)); + if (rc <= 0) + { + if (errno == EINTR) + continue; + /* Pipe closed or error - exit thread */ + break; + } + + SendCancelRequest(); + } + + return NULL; +} /* - * handle_sigint - * - * Handle interrupt signals by canceling the current command, if cancelConn - * is set. + * Signal handler for SIGINT. Sets CancelRequested and wakes up the cancel + * thread by writing to the pipe. */ static void handle_sigint(SIGNAL_ARGS) { - char errbuf[256]; + int save_errno = errno; + char c = 1; CancelRequested = true; if (cancel_callback != NULL) cancel_callback(); - /* Send QueryCancel if we are processing a database query */ - if (cancelConn != NULL) - { - if (PQcancel(cancelConn, errbuf, sizeof(errbuf))) - { - write_stderr(cancel_sent_msg); - } - else - { - write_stderr(cancel_not_sent_msg); - write_stderr(errbuf); - } - } + /* Wake up the cancel thread - write() is async-signal-safe */ + if (cancel_pipe[1] >= 0) + (void) write(cancel_pipe[1], &c, 1); + + errno = save_errno; } +#endif /* WIN32 */ + + /* * setup_cancel_handler * - * Register query cancellation callback for SIGINT. + * Set up handler for SIGINT (Unix) or console events (Windows) to send a + * cancel request to the server. + * + * The optional callback is invoked directly from the signal handler context + * on every SIGINT (on Unix), so it must be async-signal-safe. */ void setup_cancel_handler(void (*query_cancel_callback) (void)) { cancel_callback = query_cancel_callback; - cancel_sent_msg = _("Cancel request sent\n"); + cancel_sent_msg = _("Sending cancel request\n"); cancel_not_sent_msg = _("Could not send cancel request: "); - pqsignal(SIGINT, handle_sigint); -} +#ifdef WIN32 + SetConsoleCtrlHandler(consoleHandler, TRUE); +#else -#else /* WIN32 */ + /* + * Create the pipe and cancel thread (see comment on cancel_pipe above). + */ + if (pipe(cancel_pipe) < 0) + { + pg_log_error("could not create pipe for cancel: %m"); + exit(1); + } -static BOOL WINAPI -consoleHandler(DWORD dwCtrlType) -{ - char errbuf[256]; + /* + * Make the write end non-blocking, so that the signal handler won't block + * if the pipe buffer is full (which is very unlikely in practice but + * possible in theory). + */ + fcntl(cancel_pipe[1], F_SETFL, O_NONBLOCK); - if (dwCtrlType == CTRL_C_EVENT || - dwCtrlType == CTRL_BREAK_EVENT) { - CancelRequested = true; - - if (cancel_callback != NULL) - cancel_callback(); + int rc = pthread_create(&cancel_thread, NULL, cancel_thread_main, NULL); - /* Send QueryCancel if we are processing a database query */ - EnterCriticalSection(&cancelConnLock); - if (cancelConn != NULL) + if (rc != 0) { - if (PQcancel(cancelConn, errbuf, sizeof(errbuf))) - { - write_stderr(cancel_sent_msg); - } - else - { - write_stderr(cancel_not_sent_msg); - write_stderr(errbuf); - } + pg_log_error("could not create cancel thread: %s", strerror(rc)); + exit(1); } - - LeaveCriticalSection(&cancelConnLock); - - return TRUE; } - else - /* Return FALSE for any signals not being handled */ - return FALSE; -} -void -setup_cancel_handler(void (*callback) (void)) -{ - cancel_callback = callback; - cancel_sent_msg = _("Cancel request sent\n"); - cancel_not_sent_msg = _("Could not send cancel request: "); - - InitializeCriticalSection(&cancelConnLock); - - SetConsoleCtrlHandler(consoleHandler, TRUE); + pqsignal(SIGINT, handle_sigint); +#endif } - -#endif /* WIN32 */ diff --git a/src/include/fe_utils/cancel.h b/src/include/fe_utils/cancel.h index e174fb83b92..8afb2d778bf 100644 --- a/src/include/fe_utils/cancel.h +++ b/src/include/fe_utils/cancel.h @@ -23,10 +23,6 @@ extern PGDLLIMPORT volatile sig_atomic_t CancelRequested; extern void SetCancelConn(PGconn *conn); extern void ResetCancelConn(void); -/* - * A callback can be optionally set up to be called at cancellation - * time. - */ extern void setup_cancel_handler(void (*query_cancel_callback) (void)); #endif /* CANCEL_H */ -- 2.53.0
From b8f3b361a8cab9cbfc5ff8f84a79f807ef8ddfba Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio <[email protected]> Date: Sun, 8 Feb 2026 19:00:12 +0100 Subject: [PATCH v4 3/3] pg_dump: Don't use the deprecated and insecure PQcancel pg_dump still used PQcancel to send cancel requests to the server when the dump was cancelled. That libpq function is insecure, because it does not use encryption to send the cancel request. This commit starts using the new cancellation APIs (introduced in 61461a300) in pg_dump. These APIs use the same encryption settings as the connection that's being cancelled. Since these APIs are not signal-safe this required a refactor to not send the cancel request in a signal handler anymore, but instead using a dedicated thread. Windows was already doing that too, so now the paths can share some code. There's still quite a bit of behavioural difference though, because the pg_dump is using threads for parallelism on Windows, but processes on Unixes. --- src/bin/pg_dump/Makefile | 2 +- src/bin/pg_dump/meson.build | 2 + src/bin/pg_dump/parallel.c | 404 ++++++++++++++------------- src/bin/pg_dump/pg_backup_archiver.c | 2 +- src/bin/pg_dump/pg_backup_archiver.h | 8 +- src/bin/pg_dump/pg_backup_db.c | 7 +- 6 files changed, 222 insertions(+), 203 deletions(-) diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 79073b0a0ea..f76346c4f6c 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -21,7 +21,7 @@ export LZ4 export ZSTD export with_icu -override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) +override CPPFLAGS := -I$(libpq_srcdir) -I$(top_srcdir)/src/port $(CPPFLAGS) LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index 7c9a475963b..c772cd0e2c0 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -22,6 +22,8 @@ pg_dump_common_sources = files( pg_dump_common = static_library('libpgdump_common', pg_dump_common_sources, c_pch: pch_postgres_fe_h, + # port needs to be in include path due to pthread-win32.h + include_directories: ['../../port'], dependencies: [frontend_code, libpq, lz4, zlib, zstd], kwargs: internal_lib_args, ) diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index a28561fbd84..cc2fd7eecf7 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -58,8 +58,12 @@ #include <signal.h> #include <unistd.h> #include <fcntl.h> +#include <pthread.h> +#else +#include "pthread-win32.h" #endif +#include "common/logging.h" #include "fe_utils/string_utils.h" #include "parallel.h" #include "pg_backup_utils.h" @@ -167,6 +171,7 @@ typedef struct DumpSignalInformation ArchiveHandle *myAH; /* database connection to issue cancel for */ ParallelState *pstate; /* parallel state, if any */ bool handler_set; /* signal handler set up in this process? */ + bool cancel_requested; /* cancel requested via signal? */ #ifndef WIN32 bool am_worker; /* am I a worker process? */ #endif @@ -174,8 +179,20 @@ typedef struct DumpSignalInformation static volatile DumpSignalInformation signal_info; -#ifdef WIN32 -static CRITICAL_SECTION signal_info_lock; +/* + * Mutex protecting signal_info during cancel operations. + */ +static pthread_mutex_t signal_info_lock; + +#ifndef WIN32 +/* + * On Unix, the signal handler cannot call PQcancelBlocking() directly because + * it is not async-signal-safe. Instead, we use a pipe to wake a dedicated + * cancel thread: the signal handler writes a byte to the pipe, and the cancel + * thread's blocking read() returns, triggering the actual cancel requests. + */ +static int cancel_pipe[2] = {-1, -1}; +static pthread_t cancel_thread; #endif /* @@ -209,6 +226,7 @@ static void WaitForTerminatingWorkers(ParallelState *pstate); static void set_cancel_handler(void); static void set_cancel_pstate(ParallelState *pstate); static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH); +static void StopWorkers(void); static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); static int GetIdleWorker(ParallelState *pstate); static bool HasEveryWorkerTerminated(ParallelState *pstate); @@ -424,32 +442,9 @@ ShutdownWorkersHard(ParallelState *pstate) /* * Force early termination of any commands currently in progress. */ -#ifndef WIN32 - /* On non-Windows, send SIGTERM to each worker process. */ - for (i = 0; i < pstate->numWorkers; i++) - { - pid_t pid = pstate->parallelSlot[i].pid; - - if (pid != 0) - kill(pid, SIGTERM); - } -#else - - /* - * On Windows, send query cancels directly to the workers' backends. Use - * a critical section to ensure worker threads don't change state. - */ - EnterCriticalSection(&signal_info_lock); - for (i = 0; i < pstate->numWorkers; i++) - { - ArchiveHandle *AH = pstate->parallelSlot[i].AH; - char errbuf[1]; - - if (AH != NULL && AH->connCancel != NULL) - (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); - } - LeaveCriticalSection(&signal_info_lock); -#endif + pthread_mutex_lock(&signal_info_lock); + StopWorkers(); + pthread_mutex_unlock(&signal_info_lock); /* Now wait for them to terminate. */ WaitForTerminatingWorkers(pstate); @@ -533,74 +528,54 @@ WaitForTerminatingWorkers(ParallelState *pstate) * could leave a SQL command (e.g., CREATE INDEX on a large table) running * for a long time. Instead, we try to send a cancel request and then die. * pg_dump probably doesn't really need this, but we might as well use it - * there too. Note that sending the cancel directly from the signal handler - * is safe because PQcancel() is written to make it so. + * there too. * - * In parallel operation on Unix, each process is responsible for canceling - * its own connection (this must be so because nobody else has access to it). - * Furthermore, the leader process should attempt to forward its signal to - * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't - * needed because typing control-C at the console would deliver SIGINT to - * every member of the terminal process group --- but in other scenarios it - * might be that only the leader gets signaled. + * On Unix, the signal handler wakes up a dedicated cancel thread via a + * self-pipe, which then sends the cancel and calls _exit(). This thread also + * forwards the signal to each child so they can also cancel their queries. In + * simple manual use of pg_dump/pg_restore, forwarding isn't needed because + * typing control-C at the console would deliver SIGINT to every member of the + * terminal process group --- but in other scenarios it might be that only the + * leader gets signaled. * * On Windows, the cancel handler runs in a separate thread, because that's * how SetConsoleCtrlHandler works. We make it stop worker threads, send * cancels on all active connections, and then return FALSE, which will allow * the process to die. For safety's sake, we use a critical section to - * protect the PGcancel structures against being changed while the signal + * protect the PGcancelConn structures against being changed while the signal * thread runs. */ -#ifndef WIN32 - /* - * Signal handler (Unix only) + * Cancel all active queries and print termination message. */ static void -sigTermHandler(SIGNAL_ARGS) +CancelBackends(void) { - int i; - char errbuf[1]; + pthread_mutex_lock(&signal_info_lock); - /* - * Some platforms allow delivery of new signals to interrupt an active - * signal handler. That could muck up our attempt to send PQcancel, so - * disable the signals that set_cancel_handler enabled. - */ - pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, SIG_IGN); - pqsignal(SIGQUIT, SIG_IGN); + signal_info.cancel_requested = true; /* - * If we're in the leader, forward signal to all workers. (It seems best - * to do this before PQcancel; killing the leader transaction will result - * in invalid-snapshot errors from active workers, which maybe we can - * quiet by killing workers first.) Ignore any errors. + * Stop workers first to avoid invalid-snapshot errors if the leader + * cancels before workers. */ - if (signal_info.pstate != NULL) - { - for (i = 0; i < signal_info.pstate->numWorkers; i++) - { - pid_t pid = signal_info.pstate->parallelSlot[i].pid; + StopWorkers(); - if (pid != 0) - kill(pid, SIGTERM); - } - } + if (signal_info.myAH != NULL && signal_info.myAH->cancelConn != NULL) + (void) PQcancelBlocking(signal_info.myAH->cancelConn); - /* - * Send QueryCancel if we have a connection to send to. Ignore errors, - * there's not much we can do about them anyway. - */ - if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) - (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf)); + pthread_mutex_unlock(&signal_info_lock); /* - * Report we're quitting, using nothing more complicated than write(2). - * When in parallel operation, only the leader process should do this. + * Print termination message. In parallel operation, only the leader + * should print this. On Windows, workers are threads in the same process + * and the console handler only runs in the leader context, so we can + * always print it. */ +#ifndef WIN32 if (!signal_info.am_worker) +#endif { if (progname) { @@ -609,172 +584,204 @@ sigTermHandler(SIGNAL_ARGS) } write_stderr("terminated by user\n"); } - - /* - * And die, using _exit() not exit() because the latter will invoke atexit - * handlers that can fail if we interrupted related code. - */ - _exit(1); } /* - * Enable cancel interrupt handler, if not already done. + * Stop all worker processes/threads. + * + * On Unix, send SIGTERM to each worker process; their signal handlers will + * send cancel requests to their backends. + * + * On Windows, workers are threads in the same process, so we send cancel + * requests directly to their backends. + * + * Caller must hold signal_info_lock. */ static void -set_cancel_handler(void) +StopWorkers(void) { - /* - * When forking, signal_info.handler_set will propagate into the new - * process, but that's fine because the signal handler state does too. - */ - if (!signal_info.handler_set) + int i; + + if (signal_info.pstate == NULL) + return; + + for (i = 0; i < signal_info.pstate->numWorkers; i++) { - signal_info.handler_set = true; +#ifndef WIN32 + pid_t pid = signal_info.pstate->parallelSlot[i].pid; + + if (pid != 0) + kill(pid, SIGTERM); +#else + ArchiveHandle *AH = signal_info.pstate->parallelSlot[i].AH; - pqsignal(SIGINT, sigTermHandler); - pqsignal(SIGTERM, sigTermHandler); - pqsignal(SIGQUIT, sigTermHandler); + if (AH != NULL && AH->cancelConn != NULL) + (void) PQcancelBlocking(AH->cancelConn); +#endif } } -#else /* WIN32 */ +#ifdef WIN32 /* * Console interrupt handler --- runs in a newly-started thread. * - * After stopping other threads and sending cancel requests on all open - * connections, we return FALSE which will allow the default ExitProcess() - * action to be taken. + * Send cancel requests on all open connections and return FALSE to allow + * the default ExitProcess() action to terminate the process. */ static BOOL WINAPI consoleHandler(DWORD dwCtrlType) { - int i; - char errbuf[1]; - if (dwCtrlType == CTRL_C_EVENT || dwCtrlType == CTRL_BREAK_EVENT) { - /* Critical section prevents changing data we look at here */ - EnterCriticalSection(&signal_info_lock); + CancelBackends(); + } - /* - * If in parallel mode, stop worker threads and send QueryCancel to - * their connected backends. The main point of stopping the worker - * threads is to keep them from reporting the query cancels as errors, - * which would clutter the user's screen. We needn't stop the leader - * thread since it won't be doing much anyway. Do this before - * canceling the main transaction, else we might get invalid-snapshot - * errors reported before we can stop the workers. Ignore errors, - * there's not much we can do about them anyway. - */ - if (signal_info.pstate != NULL) + /* Always return FALSE to allow signal handling to continue */ + return FALSE; +} + +#else /* !WIN32 */ + +/* + * Cancel thread main function. Waits for the signal handler to write to the + * pipe, then cancels backends and calls _exit(). + */ +static void * +cancel_thread_main(void *arg) +{ + for (;;) + { + char buf[16]; + ssize_t rc; + + /* Wait for signal handler to wake us up */ + rc = read(cancel_pipe[0], buf, sizeof(buf)); + if (rc <= 0) { - for (i = 0; i < signal_info.pstate->numWorkers; i++) - { - ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]); - ArchiveHandle *AH = slot->AH; - HANDLE hThread = (HANDLE) slot->hThread; - - /* - * Using TerminateThread here may leave some resources leaked, - * but it doesn't matter since we're about to end the whole - * process. - */ - if (hThread != INVALID_HANDLE_VALUE) - TerminateThread(hThread, 0); - - if (AH != NULL && AH->connCancel != NULL) - (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); - } + if (errno == EINTR) + continue; + /* Pipe closed or error - exit thread */ + break; } - /* - * Send QueryCancel to leader connection, if enabled. Ignore errors, - * there's not much we can do about them anyway. - */ - if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) - (void) PQcancel(signal_info.myAH->connCancel, - errbuf, sizeof(errbuf)); - - LeaveCriticalSection(&signal_info_lock); + CancelBackends(); /* - * Report we're quitting, using nothing more complicated than - * write(2). (We might be able to get away with using pg_log_*() - * here, but since we terminated other threads uncleanly above, it - * seems better to assume as little as possible.) + * And die, using _exit() not exit() because the latter will invoke + * atexit handlers that can fail if we interrupted related code. */ - if (progname) - { - write_stderr(progname); - write_stderr(": "); - } - write_stderr("terminated by user\n"); + _exit(1); } - /* Always return FALSE to allow signal handling to continue */ - return FALSE; + return NULL; } +/* + * Signal handler (Unix only). Wakes up the cancel thread by writing to the + * pipe. + */ +static void +sigTermHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + char c = 1; + + /* Wake up the cancel thread - write() is async-signal-safe */ + if (cancel_pipe[1] >= 0) + (void) write(cancel_pipe[1], &c, 1); + + errno = save_errno; +} + +#endif /* WIN32 */ + /* * Enable cancel interrupt handler, if not already done. */ static void set_cancel_handler(void) { - if (!signal_info.handler_set) + if (signal_info.handler_set) + return; + + signal_info.handler_set = true; + + pthread_mutex_init(&signal_info_lock, NULL); + +#ifdef WIN32 + SetConsoleCtrlHandler(consoleHandler, TRUE); +#else + + /* + * Create the pipe and cancel thread (see comment on cancel_pipe above). + */ + if (pipe(cancel_pipe) < 0) { - signal_info.handler_set = true; + pg_log_error("could not create pipe for cancel: %m"); + exit(1); + } - InitializeCriticalSection(&signal_info_lock); + /* + * Make the write end non-blocking, so that the signal handler won't block + * if the pipe buffer is full (which is very unlikely in practice but + * possible in theory). + */ + fcntl(cancel_pipe[1], F_SETFL, O_NONBLOCK); - SetConsoleCtrlHandler(consoleHandler, TRUE); + { + int rc = pthread_create(&cancel_thread, NULL, cancel_thread_main, NULL); + + if (rc != 0) + { + pg_log_error("could not create cancel thread: %s", strerror(rc)); + exit(1); + } } -} -#endif /* WIN32 */ + pqsignal(SIGINT, sigTermHandler); + pqsignal(SIGTERM, sigTermHandler); + pqsignal(SIGQUIT, sigTermHandler); +#endif +} /* * set_archive_cancel_info * - * Fill AH->connCancel with cancellation info for the specified database + * Fill AH->cancelConn with cancellation info for the specified database * connection; or clear it if conn is NULL. */ void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn) { - PGcancel *oldConnCancel; + PGcancelConn *oldCancelConn; /* - * Activate the interrupt handler if we didn't yet in this process. On - * Windows, this also initializes signal_info_lock; therefore it's - * important that this happen at least once before we fork off any - * threads. + * Activate the interrupt handler if we didn't yet in this process. This + * also initializes signal_info_lock; therefore it's important that this + * happen at least once before we fork off any threads. */ set_cancel_handler(); /* - * On Unix, we assume that storing a pointer value is atomic with respect - * to any possible signal interrupt. On Windows, use a critical section. + * Use mutex to prevent the cancel handler from using the pointer while + * we're changing it. */ - -#ifdef WIN32 - EnterCriticalSection(&signal_info_lock); -#endif + pthread_mutex_lock(&signal_info_lock); /* Free the old one if we have one */ - oldConnCancel = AH->connCancel; + oldCancelConn = AH->cancelConn; /* be sure interrupt handler doesn't use pointer while freeing */ - AH->connCancel = NULL; + AH->cancelConn = NULL; - if (oldConnCancel != NULL) - PQfreeCancel(oldConnCancel); + if (oldCancelConn != NULL) + PQcancelFinish(oldCancelConn); /* Set the new one if specified */ if (conn) - AH->connCancel = PQgetCancel(conn); + AH->cancelConn = PQcancelCreate(conn); /* * On Unix, there's only ever one active ArchiveHandle per process, so we @@ -790,49 +797,35 @@ set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn) signal_info.myAH = AH; #endif -#ifdef WIN32 - LeaveCriticalSection(&signal_info_lock); -#endif + pthread_mutex_unlock(&signal_info_lock); } /* * set_cancel_pstate * * Set signal_info.pstate to point to the specified ParallelState, if any. - * We need this mainly to have an interlock against Windows signal thread. + * We need this mainly to have an interlock against the cancel handler thread. */ static void set_cancel_pstate(ParallelState *pstate) { -#ifdef WIN32 - EnterCriticalSection(&signal_info_lock); -#endif - + pthread_mutex_lock(&signal_info_lock); signal_info.pstate = pstate; - -#ifdef WIN32 - LeaveCriticalSection(&signal_info_lock); -#endif + pthread_mutex_unlock(&signal_info_lock); } /* * set_cancel_slot_archive * * Set ParallelSlot's AH field to point to the specified archive, if any. - * We need this mainly to have an interlock against Windows signal thread. + * We need this mainly to have an interlock against the cancel handler thread. */ static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH) { -#ifdef WIN32 - EnterCriticalSection(&signal_info_lock); -#endif - + pthread_mutex_lock(&signal_info_lock); slot->AH = AH; - -#ifdef WIN32 - LeaveCriticalSection(&signal_info_lock); -#endif + pthread_mutex_unlock(&signal_info_lock); } @@ -947,7 +940,7 @@ ParallelBackupStart(ArchiveHandle *AH) /* * Temporarily disable query cancellation on the leader connection. This - * ensures that child processes won't inherit valid AH->connCancel + * ensures that child processes won't inherit valid AH->cancelConn * settings and thus won't try to issue cancels against the leader's * connection. No harm is done if we fail while it's disabled, because * the leader connection is idle at this point anyway. @@ -1005,6 +998,17 @@ ParallelBackupStart(ArchiveHandle *AH) /* instruct signal handler that we're in a worker now */ signal_info.am_worker = true; + /* + * Reset cancel handler state so that the worker will set up its + * own cancel thread when it calls set_archive_cancel_info(). + * Threads don't survive fork(), so we can't use the leader's. + * Also close the inherited pipe fds. + */ + signal_info.handler_set = false; + close(cancel_pipe[0]); + close(cancel_pipe[1]); + cancel_pipe[0] = cancel_pipe[1] = -1; + /* close read end of Worker -> Leader */ closesocket(pipeWM[PIPE_READ]); /* close write end of Leader -> Worker */ @@ -1421,8 +1425,18 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) if (!msg) { - /* If do_wait is true, we must have detected EOF on some socket */ - if (do_wait) + /* + * If do_wait is true, we must have detected EOF on some socket. If + * it's due to a cancel request, that's expected, otherwise it's a + * problem. + */ + bool cancel_requested; + + pthread_mutex_lock(&signal_info_lock); + cancel_requested = signal_info.cancel_requested; + pthread_mutex_unlock(&signal_info_lock); + + if (do_wait && !cancel_requested) pg_fatal("a worker process died unexpectedly"); return false; } diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index df8a69d3b79..ae037e70d55 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -5208,7 +5208,7 @@ CloneArchive(ArchiveHandle *AH) /* The clone will have its own connection, so disregard connection state */ clone->connection = NULL; - clone->connCancel = NULL; + clone->cancelConn = NULL; clone->currUser = NULL; clone->currSchema = NULL; clone->currTableAm = NULL; diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 365073b3eae..54e4099be53 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -288,8 +288,12 @@ struct _archiveHandle char *savedPassword; /* password for ropt->username, if known */ char *use_role; PGconn *connection; - /* If connCancel isn't NULL, SIGINT handler will send a cancel */ - PGcancel *volatile connCancel; + + /* + * If cancelConn isn't NULL, SIGINT handler will trigger the cancel thread + * to send a cancel. + */ + PGcancelConn *cancelConn; int connectToDB; /* Flag to indicate if direct DB connection is * required */ diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index 5c349279beb..0cc29a8aa70 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -84,7 +84,7 @@ ReconnectToServer(ArchiveHandle *AH, const char *dbname) /* * Note: we want to establish the new connection, and in particular update - * ArchiveHandle's connCancel, before closing old connection. Otherwise + * ArchiveHandle's cancelConn, before closing old connection. Otherwise * an ill-timed SIGINT could try to access a dead connection. */ AH->connection = NULL; /* dodge error check in ConnectDatabaseAhx */ @@ -164,12 +164,11 @@ void DisconnectDatabase(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; - char errbuf[1]; if (!AH->connection) return; - if (AH->connCancel) + if (AH->cancelConn) { /* * If we have an active query, send a cancel before closing, ignoring @@ -177,7 +176,7 @@ DisconnectDatabase(Archive *AHX) * helpful during pg_fatal(). */ if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE) - (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); + (void) PQcancelBlocking(AH->cancelConn); /* * Prevent signal handler from sending a cancel after this. -- 2.53.0
