Hi, On 2022-01-15 10:59:00 +1300, Thomas Munro wrote: > On Sat, Jan 15, 2022 at 9:28 AM Andres Freund <and...@anarazel.de> wrote: > > I think it doesn't even need to touch socket.c to cause breakage. Using two > > different WaitEventSets is enough. > > Right. I was interested in your observation because so far we'd > *only* been considering the two-consecutive-WaitEventSets case, which > we could grok experimentally.
There likely are further problems in other parts, but I think socket.c is unlikely to be involved in walreceiver case - there shouldn't be any socket.c style socket in walreceiver itself, nor do I think we are doing a send/recv/select backed by socket.c. > The patch Alexander tested most recently uses a tri-state eof flag [...] What about instead giving WalReceiverConn an internal WaitEventSet, and using that consistently? I've attached a draft for that. Alexander, could you test with that patch applied? Greetings, Andres Freund
>From 8139d324687364164c9abbd3e2b2a54b5d5bbcad Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Fri, 14 Jan 2022 14:39:57 -0800 Subject: [PATCH] WIP: use long-lived WaitEventSet for libpqwalreceiver connections. --- src/include/replication/walreceiver.h | 29 +++-- .../libpqwalreceiver/libpqwalreceiver.c | 112 ++++++++++++------ src/backend/replication/logical/tablesync.c | 11 +- src/backend/replication/logical/worker.c | 13 +- src/backend/replication/walreceiver.c | 26 +--- 5 files changed, 112 insertions(+), 79 deletions(-) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 92f73a55b8d..07a16470d48 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -253,6 +253,17 @@ typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); */ typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); +/* + * walrcv_wait_fn + * + * Waits for socket to become readable, or a latch interrupt. + * + * timeout is passed on to WaitEventSetWait(), return value is + * like WaitLatchOrSocket's. + */ +typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, long timeout, + uint32 wait_event_info); + /* * walrcv_get_senderinfo_fn * @@ -317,14 +328,13 @@ typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, /* * walrcv_receive_fn * - * Receive a message available from the WAL stream. 'buffer' is a pointer - * to a buffer holding the message received. Returns the length of the data, - * 0 if no data is available yet ('wait_fd' is a socket descriptor which can - * be waited on before a retry), and -1 if the cluster ended the COPY. + * Receive a message available from the WAL stream. 'buffer' is a pointer to + * a buffer holding the message received. Returns the length of the data, 0 + * if no data is available yet (see walrcv_wait()), and -1 if the cluster + * ended the COPY. */ typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, - char **buffer, - pgsocket *wait_fd); + char **buffer); /* * walrcv_send_fn @@ -385,6 +395,7 @@ typedef struct WalReceiverFunctionsType walrcv_connect_fn walrcv_connect; walrcv_check_conninfo_fn walrcv_check_conninfo; walrcv_get_conninfo_fn walrcv_get_conninfo; + walrcv_wait_fn walrcv_wait; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; walrcv_server_version_fn walrcv_server_version; @@ -407,6 +418,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_check_conninfo(conninfo) #define walrcv_get_conninfo(conn) \ WalReceiverFunctions->walrcv_get_conninfo(conn) +#define walrcv_wait(conn, timeout, wait_event_info) \ + WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event_info) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ @@ -419,8 +432,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_startstreaming(conn, options) #define walrcv_endstreaming(conn, next_tli) \ WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) -#define walrcv_receive(conn, buffer, wait_fd) \ - WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) +#define walrcv_receive(conn, buffer) \ + WalReceiverFunctions->walrcv_receive(conn, buffer) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a6..24f062f779c 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -46,6 +46,8 @@ struct WalReceiverConn bool logical; /* Buffer for currently read records */ char *recvBuf; + + WaitEventSet *wes; }; /* Prototypes for interface functions */ @@ -54,6 +56,8 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo, char **err); static void libpqrcv_check_conninfo(const char *conninfo); static char *libpqrcv_get_conninfo(WalReceiverConn *conn); +static int libpqrcv_wait(WalReceiverConn *conn, long timeout, + uint32 wait_event_info); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, @@ -66,8 +70,7 @@ static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options); static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli); -static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd); +static int libpqrcv_receive(WalReceiverConn *conn, char **buffer); static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes); static char *libpqrcv_create_slot(WalReceiverConn *conn, @@ -87,6 +90,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, libpqrcv_check_conninfo, libpqrcv_get_conninfo, + libpqrcv_wait, libpqrcv_get_senderinfo, libpqrcv_identify_system, libpqrcv_server_version, @@ -102,8 +106,8 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); +static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query); +static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -203,6 +207,11 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, else io_flag = WL_SOCKET_WRITEABLE; + /* + * NB: cannot yet use/create conn->wes, during connection + * establishment the socket can change. + */ + rc = WaitLatchOrSocket(MyLatch, WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, PQsocket(conn->streamConn), @@ -227,11 +236,20 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, return NULL; } + conn->wes = CreateWaitEventSet(CurrentMemoryContext, 3); + AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + /* XXX: This isn't necessarily nice to do unconditionally? */ + AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE, PQsocket(conn->streamConn), + NULL, NULL); + if (logical) { PGresult *res; - res = libpqrcv_PQexec(conn->streamConn, + res = libpqrcv_PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -359,7 +377,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -482,7 +500,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqrcv_PQexec(conn, cmd.data); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -532,7 +550,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -547,7 +565,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -561,7 +579,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -572,7 +590,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (res != NULL) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -597,7 +615,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqrcv_PQexec(conn, cmd); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -641,7 +659,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * May return NULL, rather than an error result, on failure. */ static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) +libpqrcv_PQexec(WalReceiverConn *conn, const char *query) { PGresult *lastResult = NULL; @@ -657,7 +675,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) * theoretically block. In practice, since we don't send very long query * strings, the risk seems negligible. */ - if (!PQsendQuery(streamConn, query)) + if (!PQsendQuery(conn->streamConn, query)) return NULL; for (;;) @@ -665,7 +683,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) /* Wait for, and collect, the next PGresult. */ PGresult *result; - result = libpqrcv_PQgetResult(streamConn); + result = libpqrcv_PQgetResult(conn); if (result == NULL) break; /* query is complete, or failure */ @@ -679,24 +697,49 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) if (PQresultStatus(lastResult) == PGRES_COPY_IN || PQresultStatus(lastResult) == PGRES_COPY_OUT || PQresultStatus(lastResult) == PGRES_COPY_BOTH || - PQstatus(streamConn) == CONNECTION_BAD) + PQstatus(conn->streamConn) == CONNECTION_BAD) break; } return lastResult; } +/* + * XXX: Currently the WES is configured to only check if socket is readable, + * not writeable. That might need to be configurable in the future. + */ +static int +libpqrcv_wait(WalReceiverConn *conn, long timeout, uint32 wait_event_info) +{ + int nevents; + WaitEvent event; + int ret = 0; + + nevents = WaitEventSetWait(conn->wes, timeout, &event, 1, wait_event_info); + + if (nevents == 0) + ret |= WL_TIMEOUT; + else + { + ret |= event.events & (WL_LATCH_SET | + WL_POSTMASTER_DEATH | + WL_SOCKET_MASK); + } + + return ret; +} + /* * Perform the equivalent of PQgetResult(), but watch for interrupts. */ static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) +libpqrcv_PQgetResult(WalReceiverConn *conn) { /* * Collect data until PQgetResult is ready to get the result without * blocking. */ - while (PQisBusy(streamConn)) + while (PQisBusy(conn->streamConn)) { int rc; @@ -705,12 +748,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) * since we'll get interrupted by signals and can handle any * interrupts here. */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); + rc = libpqrcv_wait(conn, -1, WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); /* Interrupted? */ if (rc & WL_LATCH_SET) @@ -720,7 +758,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) } /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) + if (PQconsumeInput(conn->streamConn) == 0) { /* trouble; return NULL */ return NULL; @@ -728,7 +766,7 @@ libpqrcv_PQgetResult(PGconn *streamConn) } /* Now we can collect and return the next PGresult */ - return PQgetResult(streamConn); + return PQgetResult(conn->streamConn); } /* @@ -737,6 +775,12 @@ libpqrcv_PQgetResult(PGconn *streamConn) static void libpqrcv_disconnect(WalReceiverConn *conn) { + if (conn->wes != NULL) + { + FreeWaitEventSet(conn->wes); + conn->wes = NULL; + } + PQfinish(conn->streamConn); if (conn->recvBuf != NULL) PQfreemem(conn->recvBuf); @@ -752,16 +796,15 @@ libpqrcv_disconnect(WalReceiverConn *conn) * point to a buffer holding the received message. The buffer is only valid * until the next libpqrcv_* call. * - * If no data was available immediately, returns 0, and *wait_fd is set to a - * socket descriptor which can be waited on before trying again. + * If no data was available immediately, returns 0. In that case walrcv_wait() + * can be used to wait for socket readiness or interrupts before trying again. * * -1 if the server ended the COPY. * * ereports on error. */ static int -libpqrcv_receive(WalReceiverConn *conn, char **buffer, - pgsocket *wait_fd) +libpqrcv_receive(WalReceiverConn *conn, char **buffer) { int rawlen; @@ -785,7 +828,6 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, if (rawlen == 0) { /* Tell caller to try again when our socket is ready. */ - *wait_fd = PQsocket(conn->streamConn); return 0; } } @@ -793,13 +835,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqrcv_PQgetResult(conn); if (res != NULL) { PQclear(res); @@ -941,7 +983,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqrcv_PQexec(conn, cmd.data); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1068,7 +1110,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqrcv_PQexec(conn, query); switch (PQresultStatus(pgres)) { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e596b69d466..9eb78d1faab 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -637,14 +637,13 @@ copy_read_data(void *outbuf, int minread, int maxread) while (maxread > 0 && bytesread < minread) { - pgsocket fd = PGINVALID_SOCKET; int len; char *buf = NULL; for (;;) { /* Try read the data. */ - len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf); CHECK_FOR_INTERRUPTS(); @@ -676,11 +675,8 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Wait for more data or latch. */ - (void) WaitLatchOrSocket(MyLatch, - WL_SOCKET_READABLE | WL_LATCH_SET | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); - + (void) walrcv_wait(LogRepWorkerWalRcvConn, 1000L, + WAIT_EVENT_LOGICAL_SYNC_DATA); ResetLatch(MyLatch); } @@ -967,6 +963,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * application_name, so that it is different from the main apply worker, * so that synchronous replication can distinguish them. */ + Assert(LogRepWorkerWalRcvConn == NULL); LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c9af775bc18..95d4d0c7d1f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2597,7 +2597,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* This outer loop iterates once per wait. */ for (;;) { - pgsocket fd = PGINVALID_SOCKET; int rc; int len; char *buf = NULL; @@ -2608,7 +2607,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextSwitchTo(ApplyMessageContext); - len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf); if (len != 0) { @@ -2688,7 +2687,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } - len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf); } } @@ -2729,11 +2728,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; - rc = WaitLatchOrSocket(MyLatch, - WL_SOCKET_READABLE | WL_LATCH_SET | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - fd, wait_time, - WAIT_EVENT_LOGICAL_APPLY_MAIN); + rc = walrcv_wait(LogRepWorkerWalRcvConn, wait_time, + WAIT_EVENT_LOGICAL_APPLY_MAIN); if (rc & WL_LATCH_SET) { @@ -3541,6 +3537,7 @@ ApplyWorkerMain(Datum main_arg) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); + Assert(LogRepWorkerWalRcvConn == NULL); LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b39fce8c23c..d70fc462bea 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -423,7 +423,6 @@ WalReceiverMain(void) char *buf; int len; bool endofwal = false; - pgsocket wait_fd = PGINVALID_SOCKET; int rc; /* @@ -446,7 +445,7 @@ WalReceiverMain(void) } /* See if we can read data immediately */ - len = walrcv_receive(wrconn, &buf, &wait_fd); + len = walrcv_receive(wrconn, &buf); if (len != 0) { /* @@ -478,7 +477,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(wrconn, &buf, &wait_fd); + len = walrcv_receive(wrconn, &buf); } /* Let the primary know that we received some data. */ @@ -496,24 +495,9 @@ WalReceiverMain(void) if (endofwal) break; - /* - * Ideally we would reuse a WaitEventSet object repeatedly - * here to avoid the overheads of WaitLatchOrSocket on epoll - * systems, but we can't be sure that libpq (or any other - * walreceiver implementation) has the same socket (even if - * the fd is the same number, it may have been closed and - * reopened since the last time). In future, if there is a - * function for removing sockets from WaitEventSet, then we - * could add and remove just the socket each time, potentially - * avoiding some system calls. - */ - Assert(wait_fd != PGINVALID_SOCKET); - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_LATCH_SET, - wait_fd, - NAPTIME_PER_CYCLE, - WAIT_EVENT_WAL_RECEIVER_MAIN); + rc = walrcv_wait(wrconn, NAPTIME_PER_CYCLE, + WAIT_EVENT_WAL_RECEIVER_MAIN); + if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); -- 2.34.0