On Mon, Jul 10, 2023 at 01:44:45PM +0900, Michael Paquier wrote:
> As StreamLogicalLog() states once it leaves its main loop because
> time_to_abort has been switched to true, we want a clean exit.  I
> think that this patch is just a more complicated way to avoid doing
> twice the operations done by prepareToTerminate().  So how about
> moving the prepareToTerminate() call outside the main streaming loop
> and call it when time_to_abort is true?  Then, I would suggest to
> change the keepalive argument of prepareToTerminate() to an enum able
> to handle three values to log the reason why the tool is stopping: the
> end of WAL, an interruption or a keepalive when logging.  There are
> two of them now, but we want a third mode for the signals.

It took me some time to come back to this one, but attached is what I
had in mind.  This stuff has three reasons to stop: keepalive, end LSN
or signal.  This makes the code easier to follow.

Thoughts or comments?
--
Michael
From efb4f15d7b1f3f97da370cf4b7db7562eec08cb3 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 19 Jul 2023 11:29:02 +0900
Subject: [PATCH v5] Fix pg_recvlogical error message upon SIGINT/SIGTERM

When pg_recvlogical needs to abort on a signal like SIGINT/SIGTERM, it
is expected to exit cleanly.  However, the code forgot to clean up the
state of the connection befor leaving.  This would cause the tool to
emit messages like "unexpected termination of replication stream" error,
which is meant for really unexpected termination or a crash.

The code is refactored to apply the same termination abort operations for
signals, end LSN and keepalive cases.

Reported-by: Andres Freund
Author: Bharath Rupireddy
Reviewed-by: Kyotaro Horiguchi, Andres Freund, Cary Huang
Discussion: https://www.postgresql.org/message-id/20221019213953.htdtzikf4f45ywil%40awork3.anarazel.de
---
 src/bin/pg_basebackup/pg_recvlogical.c | 52 ++++++++++++++++++++------
 1 file changed, 41 insertions(+), 11 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3c7937a1d..3bd83deee7 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -32,6 +32,13 @@
 /* Time to sleep between reconnection attempts */
 #define RECONNECT_SLEEP_TIME 5
 
+typedef enum
+{
+	STREAM_STOP_END_OF_WAL,
+	STREAM_STOP_KEEPALIVE,
+	STREAM_STOP_SIGNAL
+}			StreamStopReason;
+
 /* Global Options */
 static char *outfile = NULL;
 static int	verbose = 0;
@@ -66,7 +73,7 @@ static void usage(void);
 static void StreamLogicalLog(void);
 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
-							   bool keepalive, XLogRecPtr lsn);
+							   StreamStopReason reason, XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -207,6 +214,8 @@ StreamLogicalLog(void)
 	TimestampTz last_status = -1;
 	int			i;
 	PQExpBuffer query;
+	XLogRecPtr	stop_lsn = InvalidXLogRecPtr;
+	StreamStopReason stop_reason = STREAM_STOP_SIGNAL;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
@@ -487,7 +496,7 @@ StreamLogicalLog(void)
 
 			if (endposReached)
 			{
-				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+				stop_reason = STREAM_STOP_KEEPALIVE;
 				time_to_abort = true;
 				break;
 			}
@@ -519,6 +528,12 @@ StreamLogicalLog(void)
 		/* Extract WAL location for this block */
 		cur_record_lsn = fe_recvint64(&copybuf[1]);
 
+		/*
+		 * If this loop is aborted, like on signal, saving this information
+		 * here gives a correct feedback.
+		 */
+		stop_lsn = cur_record_lsn;
+
 		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
 		{
 			/*
@@ -527,7 +542,7 @@ StreamLogicalLog(void)
 			 */
 			if (!flushAndSendFeedback(conn, &now))
 				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			stop_reason = STREAM_STOP_END_OF_WAL;
 			time_to_abort = true;
 			break;
 		}
@@ -572,12 +587,16 @@ StreamLogicalLog(void)
 			/* endpos was exactly the record we just processed, we're done */
 			if (!flushAndSendFeedback(conn, &now))
 				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			stop_reason = STREAM_STOP_END_OF_WAL;
 			time_to_abort = true;
 			break;
 		}
 	}
 
+	/* Clean up connection state if stream has been aborted */
+	if (time_to_abort)
+		prepareToTerminate(conn, endpos, stop_reason, stop_lsn);
+
 	res = PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -1021,18 +1040,29 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now)
  * retry on failure.
  */
 static void
-prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
+				   XLogRecPtr lsn)
 {
 	(void) PQputCopyEnd(conn, NULL);
 	(void) PQflush(conn);
 
 	if (verbose)
 	{
-		if (keepalive)
-			pg_log_info("end position %X/%X reached by keepalive",
-						LSN_FORMAT_ARGS(endpos));
-		else
-			pg_log_info("end position %X/%X reached by WAL record at %X/%X",
-						LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+		switch (reason)
+		{
+			case STREAM_STOP_SIGNAL:
+				pg_log_info("end position %X/%X reached on signal",
+							LSN_FORMAT_ARGS(lsn));
+				break;
+			case STREAM_STOP_KEEPALIVE:
+				pg_log_info("end position %X/%X reached by keepalive",
+							LSN_FORMAT_ARGS(endpos));
+				break;
+			case STREAM_STOP_END_OF_WAL:
+				Assert(!XLogRecPtrIsInvalid(lsn));
+				pg_log_info("end position %X/%X reached by WAL record at %X/%X",
+							LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+				break;
+		}
 	}
 }
-- 
2.40.1

Attachment: signature.asc
Description: PGP signature

Reply via email to