Hi,

On 2022-01-15 10:59:00 +1300, Thomas Munro wrote:
> On Sat, Jan 15, 2022 at 9:28 AM Andres Freund <and...@anarazel.de> wrote:
> > I think it doesn't even need to touch socket.c to cause breakage. Using two
> > different WaitEventSets is enough.
>
> Right.  I was interested in your observation because so far we'd
> *only* been considering the two-consecutive-WaitEventSets case, which
> we could grok experimentally.

There likely are further problems in other parts, but I think socket.c is
unlikely to be involved in walreceiver case - there shouldn't be any socket.c
style socket in walreceiver itself, nor do I think we are doing a
send/recv/select backed by socket.c.


> The patch Alexander tested most recently uses a tri-state eof flag [...]

What about instead giving WalReceiverConn an internal WaitEventSet, and using
that consistently? I've attached a draft for that.

Alexander, could you test with that patch applied?

Greetings,

Andres Freund
>From 8139d324687364164c9abbd3e2b2a54b5d5bbcad Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 14 Jan 2022 14:39:57 -0800
Subject: [PATCH] WIP: use long-lived WaitEventSet for libpqwalreceiver
 connections.

---
 src/include/replication/walreceiver.h         |  29 +++--
 .../libpqwalreceiver/libpqwalreceiver.c       | 112 ++++++++++++------
 src/backend/replication/logical/tablesync.c   |  11 +-
 src/backend/replication/logical/worker.c      |  13 +-
 src/backend/replication/walreceiver.c         |  26 +---
 5 files changed, 112 insertions(+), 79 deletions(-)

diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 92f73a55b8d..07a16470d48 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -253,6 +253,17 @@ typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
  */
 typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
 
+/*
+ * walrcv_wait_fn
+ *
+ * Waits for socket to become readable, or a latch interrupt.
+ *
+ * timeout is passed on to WaitEventSetWait(), return value is
+ * like WaitLatchOrSocket's.
+ */
+typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, long timeout,
+							   uint32 wait_event_info);
+
 /*
  * walrcv_get_senderinfo_fn
  *
@@ -317,14 +328,13 @@ typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
 /*
  * walrcv_receive_fn
  *
- * Receive a message available from the WAL stream.  'buffer' is a pointer
- * to a buffer holding the message received.  Returns the length of the data,
- * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
- * be waited on before a retry), and -1 if the cluster ended the COPY.
+ * Receive a message available from the WAL stream.  'buffer' is a pointer to
+ * a buffer holding the message received.  Returns the length of the data, 0
+ * if no data is available yet (see walrcv_wait()), and -1 if the cluster
+ * ended the COPY.
  */
 typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
-								  char **buffer,
-								  pgsocket *wait_fd);
+								  char **buffer);
 
 /*
  * walrcv_send_fn
@@ -385,6 +395,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_connect_fn walrcv_connect;
 	walrcv_check_conninfo_fn walrcv_check_conninfo;
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
+	walrcv_wait_fn walrcv_wait;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
 	walrcv_server_version_fn walrcv_server_version;
@@ -407,6 +418,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_check_conninfo(conninfo)
 #define walrcv_get_conninfo(conn) \
 	WalReceiverFunctions->walrcv_get_conninfo(conn)
+#define walrcv_wait(conn, timeout, wait_event_info) \
+	WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event_info)
 #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
@@ -419,8 +432,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_startstreaming(conn, options)
 #define walrcv_endstreaming(conn, next_tli) \
 	WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
-#define walrcv_receive(conn, buffer, wait_fd) \
-	WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
+#define walrcv_receive(conn, buffer) \
+	WalReceiverFunctions->walrcv_receive(conn, buffer)
 #define walrcv_send(conn, buffer, nbytes) \
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
 #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a6..24f062f779c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -46,6 +46,8 @@ struct WalReceiverConn
 	bool		logical;
 	/* Buffer for currently read records */
 	char	   *recvBuf;
+
+	WaitEventSet *wes;
 };
 
 /* Prototypes for interface functions */
@@ -54,6 +56,8 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
 										 char **err);
 static void libpqrcv_check_conninfo(const char *conninfo);
 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
+static int libpqrcv_wait(WalReceiverConn *conn, long timeout,
+						 uint32 wait_event_info);
 static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
@@ -66,8 +70,7 @@ static bool libpqrcv_startstreaming(WalReceiverConn *conn,
 									const WalRcvStreamOptions *options);
 static void libpqrcv_endstreaming(WalReceiverConn *conn,
 								  TimeLineID *next_tli);
-static int	libpqrcv_receive(WalReceiverConn *conn, char **buffer,
-							 pgsocket *wait_fd);
+static int	libpqrcv_receive(WalReceiverConn *conn, char **buffer);
 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
 						  int nbytes);
 static char *libpqrcv_create_slot(WalReceiverConn *conn,
@@ -87,6 +90,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_connect,
 	libpqrcv_check_conninfo,
 	libpqrcv_get_conninfo,
+	libpqrcv_wait,
 	libpqrcv_get_senderinfo,
 	libpqrcv_identify_system,
 	libpqrcv_server_version,
@@ -102,8 +106,8 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
+static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query);
+static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -203,6 +207,11 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		else
 			io_flag = WL_SOCKET_WRITEABLE;
 
+		/*
+		 * NB: cannot yet use/create conn->wes, during connection
+		 * establishment the socket can change.
+		 */
+
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
 							   PQsocket(conn->streamConn),
@@ -227,11 +236,20 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		return NULL;
 	}
 
+	conn->wes = CreateWaitEventSet(CurrentMemoryContext, 3);
+	AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET,
+					  MyLatch, NULL);
+	/* XXX: This isn't necessarily nice to do unconditionally? */
+	AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+					  NULL, NULL);
+	AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE, PQsocket(conn->streamConn),
+					  NULL, NULL);
+
 	if (logical)
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
+		res = libpqrcv_PQexec(conn,
 							  ALWAYS_SECURE_SEARCH_PATH_SQL);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
@@ -359,7 +377,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -482,7 +500,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -532,7 +550,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -547,7 +565,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -561,7 +579,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -572,7 +590,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -597,7 +615,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqrcv_PQexec(conn, cmd);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -641,7 +659,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
+libpqrcv_PQexec(WalReceiverConn *conn, const char *query)
 {
 	PGresult   *lastResult = NULL;
 
@@ -657,7 +675,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 	 * theoretically block.  In practice, since we don't send very long query
 	 * strings, the risk seems negligible.
 	 */
-	if (!PQsendQuery(streamConn, query))
+	if (!PQsendQuery(conn->streamConn, query))
 		return NULL;
 
 	for (;;)
@@ -665,7 +683,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 		/* Wait for, and collect, the next PGresult. */
 		PGresult   *result;
 
-		result = libpqrcv_PQgetResult(streamConn);
+		result = libpqrcv_PQgetResult(conn);
 		if (result == NULL)
 			break;				/* query is complete, or failure */
 
@@ -679,24 +697,49 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
 			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
 			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
+			PQstatus(conn->streamConn) == CONNECTION_BAD)
 			break;
 	}
 
 	return lastResult;
 }
 
+/*
+ * XXX: Currently the WES is configured to only check if socket is readable,
+ * not writeable. That might need to be configurable in the future.
+ */
+static int
+libpqrcv_wait(WalReceiverConn *conn, long timeout, uint32 wait_event_info)
+{
+	int			nevents;
+	WaitEvent	event;
+	int			ret = 0;
+
+	nevents = WaitEventSetWait(conn->wes, timeout, &event, 1, wait_event_info);
+
+	if (nevents == 0)
+		ret |= WL_TIMEOUT;
+	else
+	{
+		ret |= event.events & (WL_LATCH_SET |
+							   WL_POSTMASTER_DEATH |
+							   WL_SOCKET_MASK);
+	}
+
+	return ret;
+}
+
 /*
  * Perform the equivalent of PQgetResult(), but watch for interrupts.
  */
 static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
+libpqrcv_PQgetResult(WalReceiverConn *conn)
 {
 	/*
 	 * Collect data until PQgetResult is ready to get the result without
 	 * blocking.
 	 */
-	while (PQisBusy(streamConn))
+	while (PQisBusy(conn->streamConn))
 	{
 		int			rc;
 
@@ -705,12 +748,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		 * since we'll get interrupted by signals and can handle any
 		 * interrupts here.
 		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+		rc = libpqrcv_wait(conn, -1, WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 		/* Interrupted? */
 		if (rc & WL_LATCH_SET)
@@ -720,7 +758,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		}
 
 		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
+		if (PQconsumeInput(conn->streamConn) == 0)
 		{
 			/* trouble; return NULL */
 			return NULL;
@@ -728,7 +766,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 	}
 
 	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
+	return PQgetResult(conn->streamConn);
 }
 
 /*
@@ -737,6 +775,12 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
+	if (conn->wes != NULL)
+	{
+		FreeWaitEventSet(conn->wes);
+		conn->wes = NULL;
+	}
+
 	PQfinish(conn->streamConn);
 	if (conn->recvBuf != NULL)
 		PQfreemem(conn->recvBuf);
@@ -752,16 +796,15 @@ libpqrcv_disconnect(WalReceiverConn *conn)
  *	 point to a buffer holding the received message. The buffer is only valid
  *	 until the next libpqrcv_* call.
  *
- *	 If no data was available immediately, returns 0, and *wait_fd is set to a
- *	 socket descriptor which can be waited on before trying again.
+ *	 If no data was available immediately, returns 0. In that case walrcv_wait()
+ *	 can be used to wait for socket readiness or interrupts before trying again.
  *
  *	 -1 if the server ended the COPY.
  *
  * ereports on error.
  */
 static int
-libpqrcv_receive(WalReceiverConn *conn, char **buffer,
-				 pgsocket *wait_fd)
+libpqrcv_receive(WalReceiverConn *conn, char **buffer)
 {
 	int			rawlen;
 
@@ -785,7 +828,6 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 		if (rawlen == 0)
 		{
 			/* Tell caller to try again when our socket is ready. */
-			*wait_fd = PQsocket(conn->streamConn);
 			return 0;
 		}
 	}
@@ -793,13 +835,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqrcv_PQgetResult(conn);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -941,7 +983,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1068,7 +1110,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqrcv_PQexec(conn, query);
 
 	switch (PQresultStatus(pgres))
 	{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e596b69d466..9eb78d1faab 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -637,14 +637,13 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 	while (maxread > 0 && bytesread < minread)
 	{
-		pgsocket	fd = PGINVALID_SOCKET;
 		int			len;
 		char	   *buf = NULL;
 
 		for (;;)
 		{
 			/* Try read the data. */
-			len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+			len = walrcv_receive(LogRepWorkerWalRcvConn, &buf);
 
 			CHECK_FOR_INTERRUPTS();
 
@@ -676,11 +675,8 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		/*
 		 * Wait for more data or latch.
 		 */
-		(void) WaitLatchOrSocket(MyLatch,
-								 WL_SOCKET_READABLE | WL_LATCH_SET |
-								 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-								 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
-
+		(void) walrcv_wait(LogRepWorkerWalRcvConn, 1000L,
+						   WAIT_EVENT_LOGICAL_SYNC_DATA);
 		ResetLatch(MyLatch);
 	}
 
@@ -967,6 +963,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * application_name, so that it is different from the main apply worker,
 	 * so that synchronous replication can distinguish them.
 	 */
+	Assert(LogRepWorkerWalRcvConn == NULL);
 	LogRepWorkerWalRcvConn =
 		walrcv_connect(MySubscription->conninfo, true, slotname, &err);
 	if (LogRepWorkerWalRcvConn == NULL)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c9af775bc18..95d4d0c7d1f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2597,7 +2597,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
-		pgsocket	fd = PGINVALID_SOCKET;
 		int			rc;
 		int			len;
 		char	   *buf = NULL;
@@ -2608,7 +2607,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		MemoryContextSwitchTo(ApplyMessageContext);
 
-		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf);
 
 		if (len != 0)
 		{
@@ -2688,7 +2687,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
-				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf);
 			}
 		}
 
@@ -2729,11 +2728,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		else
 			wait_time = NAPTIME_PER_CYCLE;
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_SOCKET_READABLE | WL_LATCH_SET |
-							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-							   fd, wait_time,
-							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
+		rc = walrcv_wait(LogRepWorkerWalRcvConn, wait_time,
+						 WAIT_EVENT_LOGICAL_APPLY_MAIN);
 
 		if (rc & WL_LATCH_SET)
 		{
@@ -3541,6 +3537,7 @@ ApplyWorkerMain(Datum main_arg)
 		origin_startpos = replorigin_session_get_progress(false);
 		CommitTransactionCommand();
 
+		Assert(LogRepWorkerWalRcvConn == NULL);
 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
 												MySubscription->name, &err);
 		if (LogRepWorkerWalRcvConn == NULL)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b39fce8c23c..d70fc462bea 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -423,7 +423,6 @@ WalReceiverMain(void)
 				char	   *buf;
 				int			len;
 				bool		endofwal = false;
-				pgsocket	wait_fd = PGINVALID_SOCKET;
 				int			rc;
 
 				/*
@@ -446,7 +445,7 @@ WalReceiverMain(void)
 				}
 
 				/* See if we can read data immediately */
-				len = walrcv_receive(wrconn, &buf, &wait_fd);
+				len = walrcv_receive(wrconn, &buf);
 				if (len != 0)
 				{
 					/*
@@ -478,7 +477,7 @@ WalReceiverMain(void)
 							endofwal = true;
 							break;
 						}
-						len = walrcv_receive(wrconn, &buf, &wait_fd);
+						len = walrcv_receive(wrconn, &buf);
 					}
 
 					/* Let the primary know that we received some data. */
@@ -496,24 +495,9 @@ WalReceiverMain(void)
 				if (endofwal)
 					break;
 
-				/*
-				 * Ideally we would reuse a WaitEventSet object repeatedly
-				 * here to avoid the overheads of WaitLatchOrSocket on epoll
-				 * systems, but we can't be sure that libpq (or any other
-				 * walreceiver implementation) has the same socket (even if
-				 * the fd is the same number, it may have been closed and
-				 * reopened since the last time).  In future, if there is a
-				 * function for removing sockets from WaitEventSet, then we
-				 * could add and remove just the socket each time, potentially
-				 * avoiding some system calls.
-				 */
-				Assert(wait_fd != PGINVALID_SOCKET);
-				rc = WaitLatchOrSocket(MyLatch,
-									   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-									   WL_TIMEOUT | WL_LATCH_SET,
-									   wait_fd,
-									   NAPTIME_PER_CYCLE,
-									   WAIT_EVENT_WAL_RECEIVER_MAIN);
+				rc = walrcv_wait(wrconn, NAPTIME_PER_CYCLE,
+								 WAIT_EVENT_WAL_RECEIVER_MAIN);
+
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-- 
2.34.0

Reply via email to