Hi, I see there's a scope to do following improvements to pg_receivewal.c:
1) Fetch the server system identifier in the first RunIdentifySystem call and use it to identify(via pg_receivewal's ReceiveXlogStream) any unexpected changes that may happen in the server while pg_receivewal is connected to it. This can be helpful in scenarios when pg_receivewal tries to reconnect to the server (see the code around pg_usleep with RECONNECT_SLEEP_TIME) but something unexpected has happnend in the server that changed the its system identifier. Once the pg_receivewal establishes the conenction to server again, then the ReceiveXlogStream has a code chunk to compare the system identifier that we received in the initial connection. 2) Move the RunIdentifySystem to identify timeline id and start LSN from the server only if the pg_receivewal failed to get them from FindStreamingStart. This way, an extra IDENTIFY_SYSTEM command is avoided. 3) Place the "replication connetion shouldn't have any database name associated" error check right after RunIdentifySystem so that we can avoid fetching wal segment size with RetrieveWalSegSize if at all we were to fail with that error. This change is similar to what pg_recvlogical.c does. 4) Move the RetrieveWalSegSize to just before pg_receivewal.c enters main loop to get the wal from the server. This avoids an unnecessary query for pg_receivewal with "--create-slot" or "--drop-slot". 5) Have an assertion after the pg_receivewal done a good amount of work to find start timeline and LSN might be helpful: Assert(stream.timeline != 0 && stream.startpos != InvalidXLogRecPtr); Attaching a patch that does take care of above improvements. Thoughts? Regards, Bharath Rupireddy.
From 02edb7366a48ebc562d3d0f2ef4865ee704a84d5 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Mon, 30 Aug 2021 07:19:03 +0000 Subject: [PATCH v1] improve pg_receivewal code This patch does following improvements to pg_receivewal.c: 1) Fetch the server system identifier in the first RunIdentifySystem call and use it to identify(via pg_receivewal's ReceiveXlogStream) any unexpected changes that may happen in the server while pg_receivewal is connected to it. This can be helpful in scenarios when pg_receivewal tries to reconnect to the server (see the code around pg_usleep with RECONNECT_SLEEP_TIME) but something unexpected has happnend in the server that changed the its system identifier. Once the pg_receivewal establishes the conenction to server again, then the ReceiveXlogStream has a code chunk to compare the system identifier that we received in the initial connection. 2) Move the RunIdentifySystem to identify timeline id and start LSN from the server only if the pg_receivewal failed to get them from FindStreamingStart. This way, an extra IDENTIFY_SYSTEM command is avoided. 3) Place the "replication connetion shouldn't have any database name associated" error check right after RunIdentifySystem so that we can avoid fetching wal segment size with RetrieveWalSegSize if at all we were to fail with that error. This change is similar to what pg_recvlogical.c does. 4) Move the RetrieveWalSegSize to just before pg_receivewal.c enters main loop to get the wal from the server. This avoids an unnecessary query for pg_receivewal with "--create-slot" or "--drop-slot". 5) Have an assertion after the pg_receivewal done a good amount of work to find start timeline and LSN might be helpful: Assert(stream.timeline != 0 && stream.startpos != InvalidXLogRecPtr); --- src/bin/pg_basebackup/pg_receivewal.c | 53 +++++++++++++-------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 4474273daf..cffdf29d9e 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -46,7 +46,7 @@ static bool do_sync = true; static bool synchronous = false; static char *replication_slot = NULL; static XLogRecPtr endpos = InvalidXLogRecPtr; - +static char *server_sysid = NULL; static void usage(void); static DIR *get_destination_dir(char *dest_folder); @@ -369,8 +369,6 @@ FindStreamingStart(uint32 *tli) static void StreamLog(void) { - XLogRecPtr serverpos; - TimeLineID servertli; StreamCtl stream; MemSet(&stream, 0, sizeof(stream)); @@ -394,24 +392,22 @@ StreamLog(void) exit(1); } - /* - * Identify server, obtaining start LSN position and current timeline ID - * at the same time, necessary if not valid data can be found in the - * existing output directory. - */ - if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL)) - exit(1); - /* * Figure out where to start streaming. */ stream.startpos = FindStreamingStart(&stream.timeline); if (stream.startpos == InvalidXLogRecPtr) { - stream.startpos = serverpos; - stream.timeline = servertli; + /* + * No valid data can be found in the existing output directory. + * Get start LSN position and current timeline ID from the server. + */ + if (!RunIdentifySystem(conn, NULL, &stream.timeline, &stream.startpos, NULL)) + exit(1); } + Assert(stream.timeline != 0 && stream.startpos != InvalidXLogRecPtr); + /* * Always start streaming at the beginning of a segment */ @@ -435,6 +431,7 @@ StreamLog(void) stream.do_sync); stream.partial_suffix = ".partial"; stream.replication_slot = replication_slot; + stream.sysidentifier = server_sysid; ReceiveXlogStream(conn, &stream); @@ -686,21 +683,7 @@ main(int argc, char **argv) * replication connection and haven't connected using a database specific * connection. */ - if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) - exit(1); - - /* - * Set umask so that directories/files are created with the same - * permissions as directories/files in the source data directory. - * - * pg_mode_mask is set to owner-only by default and then updated in - * GetConnection() where we get the mode from the server-side with - * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm(). - */ - umask(pg_mode_mask); - - /* determine remote server's xlog segment size */ - if (!RetrieveWalSegSize(conn)) + if (!RunIdentifySystem(conn, &server_sysid, NULL, NULL, &db_name)) exit(1); /* @@ -714,6 +697,16 @@ main(int argc, char **argv) exit(1); } + /* + * Set umask so that directories/files are created with the same + * permissions as directories/files in the source data directory. + * + * pg_mode_mask is set to owner-only by default and then updated in + * GetConnection() where we get the mode from the server-side with + * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm(). + */ + umask(pg_mode_mask); + /* * Drop a replication slot. */ @@ -739,6 +732,10 @@ main(int argc, char **argv) exit(0); } + /* determine remote server's xlog segment size */ + if (!RetrieveWalSegSize(conn)) + exit(1); + /* * Don't close the connection here so that subsequent StreamLog() can * reuse it. -- 2.25.1