I've just rebased patches and merged last two fix-commits (0003 and 0004)
into 0002.
--
regards
Yura Sokolov aka funny-falcon
From f8923c9b8e2f470dd3caaa1e71fb3b931389148b Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 21 Mar 2025 16:03:30 +0300
Subject: [PATCH v5 1/3] Reapply "libpqwalreceiver: Convert to
libpq-be-fe-helpers.h"
This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4.
---
.../libpqwalreceiver/libpqwalreceiver.c | 55 +++----------------
1 file changed, 8 insertions(+), 47 deletions(-)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c650935ef5d..5755ab2f072 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -25,6 +25,7 @@
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -145,7 +146,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
bool must_use_password, const char *appname, char **err)
{
WalReceiverConn *conn;
- PostgresPollingStatusType status;
const char *keys[6];
const char *vals[6];
int i = 0;
@@ -211,56 +211,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
Assert(i < lengthof(keys));
conn = palloc0(sizeof(WalReceiverConn));
- conn->streamConn = PQconnectStartParams(keys, vals,
- /* expand_dbname = */ true);
- if (PQstatus(conn->streamConn) == CONNECTION_BAD)
- goto bad_connection_errmsg;
-
- /*
- * Poll connection until we have OK or FAILED status.
- *
- * Per spec for PQconnectPoll, first wait till socket is write-ready.
- */
- status = PGRES_POLLING_WRITING;
- do
- {
- int io_flag;
- int rc;
-
- if (status == PGRES_POLLING_READING)
- io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
- /* Windows needs a different test while waiting for connection-made */
- else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
- io_flag = WL_SOCKET_CONNECTED;
-#endif
- else
- io_flag = WL_SOCKET_WRITEABLE;
-
- rc = WaitLatchOrSocket(MyLatch,
- WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
- PQsocket(conn->streamConn),
- 0,
- WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
- /* Interrupted? */
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- ProcessWalRcvInterrupts();
- }
-
- /* If socket is ready, advance the libpq state machine */
- if (rc & io_flag)
- status = PQconnectPoll(conn->streamConn);
- } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+ conn->streamConn =
+ libpqsrv_connect_params(keys, vals,
+ /* expand_dbname = */ true,
+ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
goto bad_connection_errmsg;
if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
{
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
pfree(conn);
ereport(ERROR,
@@ -300,7 +261,7 @@ bad_connection_errmsg:
/* error path, error already set */
bad_connection:
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
pfree(conn);
return NULL;
}
@@ -880,7 +841,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
static void
libpqrcv_disconnect(WalReceiverConn *conn)
{
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
PQfreemem(conn->recvBuf);
pfree(conn);
}
--
2.43.0
From e0e7b14b9c4f3987224b2d0e8d3cb4be6e02003d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 21 Mar 2025 16:13:38 +0300
Subject: [PATCH v5 2/3] Apply walrcv_shutdown_deblocking_v2-2.patch
From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com
Plus couple of fixes from review:
- use of AmWalReceiverProcess
- use of die
---
.../libpqwalreceiver/libpqwalreceiver.c | 4 +-
src/backend/replication/walreceiver.c | 41 +++----------------
src/backend/tcop/postgres.c | 5 +++
3 files changed, 12 insertions(+), 38 deletions(-)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5755ab2f072..d51624ef3da 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -820,7 +820,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
- ProcessWalRcvInterrupts();
+ CHECK_FOR_INTERRUPTS();
}
/* Consume whatever data is available from the socket */
@@ -1172,7 +1172,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
{
char *cstrs[MaxTupleAttributeNumber];
- ProcessWalRcvInterrupts();
+ CHECK_FOR_INTERRUPTS();
/* Do the allocations in temporary context. */
oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2e5dd6deb2c..b51a6d06b21 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -71,6 +71,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
+#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -145,38 +146,6 @@ static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest. We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock. Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set. Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
-void
-ProcessWalRcvInterrupts(void)
-{
- /*
- * Although walreceiver interrupt handling doesn't use the same scheme as
- * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
- * any incoming signals on Win32, and also to make sure we process any
- * barrier events.
- */
- CHECK_FOR_INTERRUPTS();
-
- if (ShutdownRequestPending)
- {
- ereport(FATAL,
- (errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("terminating walreceiver process due to administrator command")));
- }
-}
-
/* Main entry point for walreceiver process */
void
@@ -280,7 +249,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
* file */
pqsignal(SIGINT, SIG_IGN);
- pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+ pqsignal(SIGTERM, die); /* request shutdown */
/* SIGQUIT handler was already set up by InitPostmasterChild */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
@@ -459,7 +428,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
- ProcessWalRcvInterrupts();
+ CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
@@ -555,7 +524,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
- ProcessWalRcvInterrupts();
+ CHECK_FOR_INTERRUPTS();
if (walrcv->force_reply)
{
@@ -704,7 +673,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
{
ResetLatch(MyLatch);
- ProcessWalRcvInterrupts();
+ CHECK_FOR_INTERRUPTS();
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0554a4ae3c7..c7d3b25d3f2 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -58,6 +58,7 @@
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/slot.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h"
@@ -3311,6 +3312,10 @@ ProcessInterrupts(void)
*/
proc_exit(1);
}
+ else if (AmWalReceiverProcess())
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating walreceiver process due to administrator command")));
else if (AmBackgroundWorkerProcess())
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
--
2.43.0
From 51e7c0cf4d3312df227f57b5bf61be7ce1d5f155 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 21 Mar 2025 16:15:05 +0300
Subject: [PATCH v5 3/3] Use libpq-be-fe-helpers.h wrappers more
---
.../libpqwalreceiver/libpqwalreceiver.c | 151 ++++--------------
1 file changed, 33 insertions(+), 118 deletions(-)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index d51624ef3da..be6fbe41705 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -111,8 +111,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
};
/* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
/*
@@ -239,8 +237,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
{
PGresult *res;
- res = libpqrcv_PQexec(conn->streamConn,
- ALWAYS_SECURE_SEARCH_PATH_SQL);
+ res = libpqsrv_exec(conn->streamConn,
+ ALWAYS_SECURE_SEARCH_PATH_SQL,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
@@ -412,7 +411,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
- res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+ res = libpqsrv_exec(conn->streamConn,
+ "IDENTIFY_SYSTEM",
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
@@ -589,7 +590,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
options->proto.physical.startpointTLI);
/* Start streaming. */
- res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+ res = libpqsrv_exec(conn->streamConn,
+ cmd.data,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
pfree(cmd.data);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -619,7 +622,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PGresult *res;
/*
- * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
+ * Send copy-end message. As in libpqsrv_exec, this could theoretically
* block, but the risk seems small.
*/
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -639,7 +642,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
* also possible in case we aborted the copy in mid-stream.
*/
- res = libpqrcv_PQgetResult(conn->streamConn);
+ res = libpqsrv_get_result(conn->streamConn,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
@@ -654,7 +658,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PQclear(res);
/* the result set should be followed by CommandComplete */
- res = libpqrcv_PQgetResult(conn->streamConn);
+ res = libpqsrv_get_result(conn->streamConn,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
}
else if (PQresultStatus(res) == PGRES_COPY_OUT)
{
@@ -668,7 +673,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
pchomp(PQerrorMessage(conn->streamConn)))));
/* CommandComplete should follow */
- res = libpqrcv_PQgetResult(conn->streamConn);
+ res = libpqsrv_get_result(conn->streamConn,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
}
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -679,7 +685,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PQclear(res);
/* Verify that there are no more results */
- res = libpqrcv_PQgetResult(conn->streamConn);
+ res = libpqsrv_get_result(conn->streamConn,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (res != NULL)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -704,7 +711,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
* Request the primary to send over the history file for given timeline.
*/
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
- res = libpqrcv_PQexec(conn->streamConn, cmd);
+ res = libpqsrv_exec(conn->streamConn,
+ cmd,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
@@ -734,107 +743,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
PQclear(res);
}
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
- PGresult *lastResult = NULL;
-
- /*
- * PQexec() silently discards any prior query results on the connection.
- * This is not required for this function as it's expected that the caller
- * (which is this library in all cases) will behave correctly and we don't
- * have to be backwards compatible with old libpq.
- */
-
- /*
- * Submit the query. Since we don't use non-blocking mode, this could
- * theoretically block. In practice, since we don't send very long query
- * strings, the risk seems negligible.
- */
- if (!PQsendQuery(streamConn, query))
- return NULL;
-
- for (;;)
- {
- /* Wait for, and collect, the next PGresult. */
- PGresult *result;
-
- result = libpqrcv_PQgetResult(streamConn);
- if (result == NULL)
- break; /* query is complete, or failure */
-
- /*
- * Emulate PQexec()'s behavior of returning the last result when there
- * are many. We are fine with returning just last error message.
- */
- PQclear(lastResult);
- lastResult = result;
-
- if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
- PQresultStatus(lastResult) == PGRES_COPY_OUT ||
- PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
- PQstatus(streamConn) == CONNECTION_BAD)
- break;
- }
-
- return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
- /*
- * Collect data until PQgetResult is ready to get the result without
- * blocking.
- */
- while (PQisBusy(streamConn))
- {
- int rc;
-
- /*
- * We don't need to break down the sleep into smaller increments,
- * since we'll get interrupted by signals and can handle any
- * interrupts here.
- */
- rc = WaitLatchOrSocket(MyLatch,
- WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
- WL_LATCH_SET,
- PQsocket(streamConn),
- 0,
- WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
- /* Interrupted? */
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
-
- /* Consume whatever data is available from the socket */
- if (PQconsumeInput(streamConn) == 0)
- {
- /* trouble; return NULL */
- return NULL;
- }
- }
-
- /* Now we can collect and return the next PGresult */
- return PQgetResult(streamConn);
-}
-
/*
* Disconnect connection to primary, if any.
*/
@@ -895,13 +803,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
{
PGresult *res;
- res = libpqrcv_PQgetResult(conn->streamConn);
+ res = libpqsrv_get_result(conn->streamConn,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
/* Verify that there are no more results. */
- res = libpqrcv_PQgetResult(conn->streamConn);
+ res = libpqsrv_get_result(conn->streamConn,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (res != NULL)
{
PQclear(res);
@@ -1052,7 +962,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
}
- res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+ res = libpqsrv_exec(conn->streamConn,
+ cmd.data,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1105,7 +1017,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
appendStringInfoString(&cmd, " );");
- res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+ res = libpqsrv_exec(conn->streamConn, cmd.data,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1218,7 +1131,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("the query interface requires a database connection")));
- pgres = libpqrcv_PQexec(conn->streamConn, query);
+ pgres = libpqsrv_exec(conn->streamConn,
+ query,
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
switch (PQresultStatus(pgres))
{
--
2.43.0