I've just rebased patches and merged last two fix-commits (0003 and 0004)
into 0002.

-- 
regards
Yura Sokolov aka funny-falcon
From f8923c9b8e2f470dd3caaa1e71fb3b931389148b Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 21 Mar 2025 16:03:30 +0300
Subject: [PATCH v5 1/3] Reapply "libpqwalreceiver: Convert to
 libpq-be-fe-helpers.h"

This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 55 +++----------------
 1 file changed, 8 insertions(+), 47 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c650935ef5d..5755ab2f072 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -25,6 +25,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -145,7 +146,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
 				 bool must_use_password, const char *appname, char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -211,56 +211,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
 	Assert(i < lengthof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-		goto bad_connection_errmsg;
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn =
+		libpqsrv_connect_params(keys, vals,
+								 /* expand_dbname = */ true,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
 
 	if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
 	{
-		PQfinish(conn->streamConn);
+		libpqsrv_disconnect(conn->streamConn);
 		pfree(conn);
 
 		ereport(ERROR,
@@ -300,7 +261,7 @@ bad_connection_errmsg:
 
 	/* error path, error already set */
 bad_connection:
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	pfree(conn);
 	return NULL;
 }
@@ -880,7 +841,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
-- 
2.43.0

From e0e7b14b9c4f3987224b2d0e8d3cb4be6e02003d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 21 Mar 2025 16:13:38 +0300
Subject: [PATCH v5 2/3] Apply walrcv_shutdown_deblocking_v2-2.patch

From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com
Plus couple of fixes from review:
- use of AmWalReceiverProcess
- use of die
---
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +-
 src/backend/replication/walreceiver.c         | 41 +++----------------
 src/backend/tcop/postgres.c                   |  5 +++
 3 files changed, 12 insertions(+), 38 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5755ab2f072..d51624ef3da 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -820,7 +820,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1172,7 +1172,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2e5dd6deb2c..b51a6d06b21 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -71,6 +71,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
+#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -145,38 +146,6 @@ static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
-void
-ProcessWalRcvInterrupts(void)
-{
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
-
-	if (ShutdownRequestPending)
-	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
-	}
-}
-
 
 /* Main entry point for walreceiver process */
 void
@@ -280,7 +249,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, die); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -459,7 +428,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -555,7 +524,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -704,7 +673,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0554a4ae3c7..c7d3b25d3f2 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -58,6 +58,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3311,6 +3312,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (AmWalReceiverProcess())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (AmBackgroundWorkerProcess())
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
-- 
2.43.0

From 51e7c0cf4d3312df227f57b5bf61be7ce1d5f155 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 21 Mar 2025 16:15:05 +0300
Subject: [PATCH v5 3/3] Use libpq-be-fe-helpers.h wrappers more

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 151 ++++--------------
 1 file changed, 33 insertions(+), 118 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index d51624ef3da..be6fbe41705 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -111,8 +111,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -239,8 +237,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
-							  ALWAYS_SECURE_SEARCH_PATH_SQL);
+		res = libpqsrv_exec(conn->streamConn,
+							ALWAYS_SECURE_SEARCH_PATH_SQL,
+							WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
@@ -412,7 +411,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqsrv_exec(conn->streamConn,
+						"IDENTIFY_SYSTEM",
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -589,7 +590,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -619,7 +622,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PGresult   *res;
 
 	/*
-	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+	 * Send copy-end message.  As in libpqsrv_exec, this could theoretically
 	 * block, but the risk seems small.
 	 */
 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -639,7 +642,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -654,7 +658,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -668,7 +673,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -679,7 +685,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -704,7 +711,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -734,107 +743,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-	PGresult   *lastResult = NULL;
-
-	/*
-	 * PQexec() silently discards any prior query results on the connection.
-	 * This is not required for this function as it's expected that the caller
-	 * (which is this library in all cases) will behave correctly and we don't
-	 * have to be backwards compatible with old libpq.
-	 */
-
-	/*
-	 * Submit the query.  Since we don't use non-blocking mode, this could
-	 * theoretically block.  In practice, since we don't send very long query
-	 * strings, the risk seems negligible.
-	 */
-	if (!PQsendQuery(streamConn, query))
-		return NULL;
-
-	for (;;)
-	{
-		/* Wait for, and collect, the next PGresult. */
-		PGresult   *result;
-
-		result = libpqrcv_PQgetResult(streamConn);
-		if (result == NULL)
-			break;				/* query is complete, or failure */
-
-		/*
-		 * Emulate PQexec()'s behavior of returning the last result when there
-		 * are many.  We are fine with returning just last error message.
-		 */
-		PQclear(lastResult);
-		lastResult = result;
-
-		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
-			break;
-	}
-
-	return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-	/*
-	 * Collect data until PQgetResult is ready to get the result without
-	 * blocking.
-	 */
-	while (PQisBusy(streamConn))
-	{
-		int			rc;
-
-		/*
-		 * We don't need to break down the sleep into smaller increments,
-		 * since we'll get interrupted by signals and can handle any
-		 * interrupts here.
-		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
-		{
-			/* trouble; return NULL */
-			return NULL;
-		}
-	}
-
-	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
@@ -895,13 +803,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqsrv_get_result(conn->streamConn,
+									  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -1052,7 +962,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1105,7 +1017,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
 
 	appendStringInfoString(&cmd, " );");
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn, cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1218,7 +1131,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqsrv_exec(conn->streamConn,
+						  query,
+						  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 	switch (PQresultStatus(pgres))
 	{
-- 
2.43.0

Reply via email to