Hi, On 2022-01-16 15:28:00 -0800, Andres Freund wrote: > I hacked that up last night. And a fix or two later, it seems to be > working. What I'd missed at first is that the event needs to be reset in > reached_end_position(), otherwise we'll busy loop. > > I wonder if using a short-lived event handle would have dangers of missing > FD_CLOSE here as well? It'd probably be worth avoiding the risk by creating > the event just once. > > I just wasn't immediately sure where to stash it. Probably just by adding a > field in StreamCtl, that ReceiveXlogStream() then sets? So far it's constant > once passed to ReceiveXlogStream(), but I don't really see a reason why it'd > need to stay that way?
Oops, attached the patch this time. Greetings, Andres Freund
>From 5f7f9b49d2e443b9299bc273d06696b5bf6cbaa4 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sun, 16 Jan 2022 01:58:24 -0800 Subject: [PATCH v2] Avoid slow shutdown of pg_basebackup, windows edition. See also 7834d20b57a. --- src/bin/pg_basebackup/pg_basebackup.c | 24 ++++++- src/bin/pg_basebackup/pg_receivewal.c | 4 ++ src/bin/pg_basebackup/receivelog.c | 92 ++++++++++++++++++++++++--- src/bin/pg_basebackup/receivelog.h | 4 ++ 4 files changed, 112 insertions(+), 12 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index aa43fc09241..51f48ce587f 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -146,6 +146,8 @@ static const char *progress_filename; /* Pipe to communicate with background wal receiver process */ #ifndef WIN32 static int bgpipe[2] = {-1, -1}; +#else +HANDLE *bgevent = NULL; #endif /* Handle to child process */ @@ -473,7 +475,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline, /* * At this point we have an end pointer, so compare it to the current * position to figure out if it's time to stop. + * + * On windows we need to reset the event used to wake up the streaming + * thread, otherwise CopyStreamPoll() will start to immediately return. */ +#ifdef WIN32 + ResetEvent(bgevent); +#endif + if (segendpos >= xlogendptr) return true; @@ -508,7 +517,7 @@ LogStreamerMain(logstreamer_param *param) #ifndef WIN32 stream.stop_socket = bgpipe[0]; #else - stream.stop_socket = PGINVALID_SOCKET; + stream.stop_event = bgevent; #endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; @@ -590,6 +599,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) pg_log_error("could not create pipe for background process: %m"); exit(1); } +#else + bgevent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (bgevent == NULL) + { + pg_log_error("could not create event for background thread: %lu", + GetLastError()); + exit(1); + } #endif /* Get a second connection */ @@ -1635,7 +1652,9 @@ BaseBackup(void) /* * On Windows, since we are in the same process, we can just store the * value directly in the variable, and then set the flag that says - * it's there. + * it's there. To interrupt the thread while it's waiting for network + * IO, we set an event (which the thread waits on in addition to the + * socket). */ if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2) { @@ -1645,6 +1664,7 @@ BaseBackup(void) } xlogendptr = ((uint64) hi) << 32 | lo; InterlockedIncrement(&has_xlogendptr); + SetEvent(bgevent); /* First wait for the thread to exit */ if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) != diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index ccb215c398c..d27bd85b7ce 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -618,7 +618,11 @@ StreamLog(void) stream.timeline); stream.stream_stop = stop_streaming; +#ifndef WIN32 stream.stop_socket = PGINVALID_SOCKET; +#else + stream.stop_event = NULL; +#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = synchronous; stream.do_sync = do_sync; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d39e4b11a1a..a64a1a8e8fe 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -37,8 +37,8 @@ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos); -static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket); -static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +static int CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream); +static int CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); @@ -813,7 +813,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); - r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf); + r = CopyStreamReceive(conn, sleeptime, stream, ©buf); while (r != 0) { if (r == -1) @@ -858,7 +858,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Process the received data, and any subsequent data we can read * without blocking. */ - r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf); + r = CopyStreamReceive(conn, 0, stream, ©buf); } } @@ -877,8 +877,9 @@ error: * or interrupted by signal or stop_socket input, and -1 on an error. */ static int -CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) +CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream) { +#ifndef WIN32 int ret; fd_set input_mask; int connsocket; @@ -896,10 +897,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) FD_ZERO(&input_mask); FD_SET(connsocket, &input_mask); maxfd = connsocket; - if (stop_socket != PGINVALID_SOCKET) + if (stream->stop_socket != PGINVALID_SOCKET) { - FD_SET(stop_socket, &input_mask); - maxfd = Max(maxfd, stop_socket); + FD_SET(stream->stop_socket, &input_mask); + maxfd = Max(maxfd, stream->stop_socket); } if (timeout_ms < 0) @@ -924,6 +925,77 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) return 1; /* Got input on connection socket */ return 0; /* Got timeout or input on stop_socket */ +#else + int ret; + int rc; + HANDLE *network_event; + int nevents = 0; + HANDLE events[2]; + + network_event = WSACreateEvent(); + if (network_event == WSA_INVALID_EVENT) + { + pg_log_error("failed to create event for socket: error code %d", + WSAGetLastError()); + exit(1); + } + + if (WSAEventSelect(PQsocket(conn), network_event, FD_READ | FD_CLOSE) != 0) + { + pg_log_error("failed to set up event for socket: error code %d", + WSAGetLastError()); + exit(1); + } + + events[0] = network_event; + nevents++; + + if (stream->stop_event != NULL) + { + events[1] = stream->stop_event; + nevents++; + } + + /* map timeout_ms to WaitForMultipleObjects expectations */ + if (timeout_ms < 0) + timeout_ms = INFINITE; + + rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms); + + if (rc == WAIT_FAILED) + { + pg_log_error("WaitForMultipleObjects() failed: error code %lu", + GetLastError()); + exit(1); + } + else if (rc == WAIT_TIMEOUT) + { + /* timeout exceeded */ + ret = 0; + } + else if (rc == WAIT_OBJECT_0) + { + /* Got input on connection socket */; + ret = 1; + } + else if (rc == (WAIT_OBJECT_0 + 1)) + { + Assert(stream->stop_event != NULL); + /* Got event on stop socket */; + ret = 0; + } + else + { + pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc); + exit(1); + } + + /* reset event association for libpq socket, clean up event */ + WSAEventSelect(PQsocket(conn), NULL, 0); + WSACloseEvent(network_event); + + return ret; +#endif /* WIN32 */ } /* @@ -939,7 +1011,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) * -1 on error. -2 if the server ended the COPY. */ static int -CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream, char **buffer) { char *copybuf = NULL; @@ -960,7 +1032,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, * the specified timeout, so that we can ping the server. Also stop * waiting if input appears on stop_socket. */ - ret = CopyStreamPoll(conn, timeout, stop_socket); + ret = CopyStreamPoll(conn, timeout, stream); if (ret <= 0) return ret; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 050d4bc69fd..2a643e71a74 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -40,8 +40,12 @@ typedef struct StreamCtl stream_stop_callback stream_stop; /* Stop streaming when returns true */ +#ifndef WIN32 pgsocket stop_socket; /* if valid, watch for input on this socket * and check stream_stop() when there is any */ +#else + HANDLE *stop_event; +#endif WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ -- 2.34.0