Hi,

On 2022-01-16 15:28:00 -0800, Andres Freund wrote:
> I hacked that up last night. And a fix or two later, it seems to be
> working. What I'd missed at first is that the event needs to be reset in
> reached_end_position(), otherwise we'll busy loop.
> 
> I wonder if using a short-lived event handle would have dangers of missing
> FD_CLOSE here as well? It'd probably be worth avoiding the risk by creating
> the event just once.
> 
> I just wasn't immediately sure where to stash it. Probably just by adding a
> field in StreamCtl, that ReceiveXlogStream() then sets? So far it's constant
> once passed to ReceiveXlogStream(), but I don't really see a reason why it'd
> need to stay that way?

Oops, attached the patch this time.

Greetings,

Andres Freund
>From 5f7f9b49d2e443b9299bc273d06696b5bf6cbaa4 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 16 Jan 2022 01:58:24 -0800
Subject: [PATCH v2] Avoid slow shutdown of pg_basebackup, windows edition.

See also 7834d20b57a.
---
 src/bin/pg_basebackup/pg_basebackup.c | 24 ++++++-
 src/bin/pg_basebackup/pg_receivewal.c |  4 ++
 src/bin/pg_basebackup/receivelog.c    | 92 ++++++++++++++++++++++++---
 src/bin/pg_basebackup/receivelog.h    |  4 ++
 4 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index aa43fc09241..51f48ce587f 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -146,6 +146,8 @@ static const char *progress_filename;
 /* Pipe to communicate with background wal receiver process */
 #ifndef WIN32
 static int	bgpipe[2] = {-1, -1};
+#else
+HANDLE *bgevent = NULL;
 #endif
 
 /* Handle to child process */
@@ -473,7 +475,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 	/*
 	 * At this point we have an end pointer, so compare it to the current
 	 * position to figure out if it's time to stop.
+	 *
+	 * On windows we need to reset the event used to wake up the streaming
+	 * thread, otherwise CopyStreamPoll() will start to immediately return.
 	 */
+#ifdef WIN32
+	ResetEvent(bgevent);
+#endif
+
 	if (segendpos >= xlogendptr)
 		return true;
 
@@ -508,7 +517,7 @@ LogStreamerMain(logstreamer_param *param)
 #ifndef WIN32
 	stream.stop_socket = bgpipe[0];
 #else
-	stream.stop_socket = PGINVALID_SOCKET;
+	stream.stop_event = bgevent;
 #endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
@@ -590,6 +599,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		pg_log_error("could not create pipe for background process: %m");
 		exit(1);
 	}
+#else
+	bgevent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (bgevent == NULL)
+	{
+		pg_log_error("could not create event for background thread: %lu",
+					 GetLastError());
+		exit(1);
+	}
 #endif
 
 	/* Get a second connection */
@@ -1635,7 +1652,9 @@ BaseBackup(void)
 		/*
 		 * On Windows, since we are in the same process, we can just store the
 		 * value directly in the variable, and then set the flag that says
-		 * it's there.
+		 * it's there. To interrupt the thread while it's waiting for network
+		 * IO, we set an event (which the thread waits on in addition to the
+		 * socket).
 		 */
 		if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
 		{
@@ -1645,6 +1664,7 @@ BaseBackup(void)
 		}
 		xlogendptr = ((uint64) hi) << 32 | lo;
 		InterlockedIncrement(&has_xlogendptr);
+		SetEvent(bgevent);
 
 		/* First wait for the thread to exit */
 		if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index ccb215c398c..d27bd85b7ce 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -618,7 +618,11 @@ StreamLog(void)
 					stream.timeline);
 
 	stream.stream_stop = stop_streaming;
+#ifndef WIN32
 	stream.stop_socket = PGINVALID_SOCKET;
+#else
+	stream.stop_event = NULL;
+#endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d39e4b11a1a..a64a1a8e8fe 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,8 +37,8 @@ static bool still_sending = true;	/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 								  XLogRecPtr *stoppos);
-static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
-static int	CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+static int	CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream);
+static int	CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 							  char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
 								int len, XLogRecPtr blockpos, TimestampTz *last_status);
@@ -813,7 +813,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
 												 last_status);
 
-		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
+		r = CopyStreamReceive(conn, sleeptime, stream, &copybuf);
 		while (r != 0)
 		{
 			if (r == -1)
@@ -858,7 +858,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			 * Process the received data, and any subsequent data we can read
 			 * without blocking.
 			 */
-			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
+			r = CopyStreamReceive(conn, 0, stream, &copybuf);
 		}
 	}
 
@@ -877,8 +877,9 @@ error:
  * or interrupted by signal or stop_socket input, and -1 on an error.
  */
 static int
-CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
+CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream)
 {
+#ifndef WIN32
 	int			ret;
 	fd_set		input_mask;
 	int			connsocket;
@@ -896,10 +897,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 	FD_ZERO(&input_mask);
 	FD_SET(connsocket, &input_mask);
 	maxfd = connsocket;
-	if (stop_socket != PGINVALID_SOCKET)
+	if (stream->stop_socket != PGINVALID_SOCKET)
 	{
-		FD_SET(stop_socket, &input_mask);
-		maxfd = Max(maxfd, stop_socket);
+		FD_SET(stream->stop_socket, &input_mask);
+		maxfd = Max(maxfd, stream->stop_socket);
 	}
 
 	if (timeout_ms < 0)
@@ -924,6 +925,77 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 		return 1;				/* Got input on connection socket */
 
 	return 0;					/* Got timeout or input on stop_socket */
+#else
+	int			ret;
+	int			rc;
+	HANDLE	   *network_event;
+	int			nevents = 0;
+	HANDLE		events[2];
+
+	network_event = WSACreateEvent();
+	if (network_event == WSA_INVALID_EVENT)
+	{
+		pg_log_error("failed to create event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	if (WSAEventSelect(PQsocket(conn), network_event, FD_READ | FD_CLOSE) != 0)
+	{
+		pg_log_error("failed to set up event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	events[0] = network_event;
+	nevents++;
+
+	if (stream->stop_event != NULL)
+	{
+		events[1] = stream->stop_event;
+		nevents++;
+	}
+
+	/* map timeout_ms to WaitForMultipleObjects expectations */
+	if (timeout_ms < 0)
+		timeout_ms = INFINITE;
+
+	rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms);
+
+	if (rc == WAIT_FAILED)
+	{
+		pg_log_error("WaitForMultipleObjects() failed: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
+	else if (rc == WAIT_TIMEOUT)
+	{
+		/* timeout exceeded */
+		ret = 0;
+	}
+	else if (rc == WAIT_OBJECT_0)
+	{
+		/* Got input on connection socket */;
+		ret = 1;
+	}
+	else if (rc == (WAIT_OBJECT_0 + 1))
+	{
+		Assert(stream->stop_event != NULL);
+		/* Got event on stop socket  */;
+		ret = 0;
+	}
+	else
+	{
+		pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc);
+		exit(1);
+	}
+
+	/* reset event association for libpq socket, clean up event */
+	WSAEventSelect(PQsocket(conn), NULL, 0);
+	WSACloseEvent(network_event);
+
+	return ret;
+#endif /* WIN32 */
 }
 
 /*
@@ -939,7 +1011,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  * -1 on error. -2 if the server ended the COPY.
  */
 static int
-CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 				  char **buffer)
 {
 	char	   *copybuf = NULL;
@@ -960,7 +1032,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
 		 * the specified timeout, so that we can ping the server.  Also stop
 		 * waiting if input appears on stop_socket.
 		 */
-		ret = CopyStreamPoll(conn, timeout, stop_socket);
+		ret = CopyStreamPoll(conn, timeout, stream);
 		if (ret <= 0)
 			return ret;
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 050d4bc69fd..2a643e71a74 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -40,8 +40,12 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
+#ifndef WIN32
 	pgsocket	stop_socket;	/* if valid, watch for input on this socket
 								 * and check stream_stop() when there is any */
+#else
+	HANDLE	   *stop_event;
+#endif
 
 	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
-- 
2.34.0

Reply via email to