At Thu, 20 Oct 2022 13:28:45 +0530, Bharath Rupireddy
<[email protected]> wrote in
> On Thu, Oct 20, 2022 at 3:10 AM Andres Freund <[email protected]> wrote:
> >
> > Hi,
> >
> > While reviewing
> > https://postgr.es/m/CAD21AoBe2o2D%3Dxyycsxw2bQOD%3DzPj7ETuJ5VYGN%3DdpoTiCMRJQ%40mail.gmail.com
> > I noticed that pg_recvlogical prints
> > "pg_recvlogical: error: unexpected termination of replication stream: "
> >
> > when signalled with SIGINT/TERM.
> >
> > Oddly enough, that looks to have "always" been the case, even though clearly
> > the code tried to make provisions for a different outcome.
> >
> >
> > It looks to me like all that's needed is to gate the block printing the
> > message with an !time_to_abort.
+1
> +1. How about emitting a message like its friend pg_receivewal, like
> the attached patch?
I'm not a fan of treating SIGINT as an error in this case. It calls
prepareToTerminate() when time_to_abort and everything goes fine after
then. So I think we should do the same thing after receiving an
interrupt. This also does file-sync naturally as a part of normal
shutdown. I'm also not a fan of doing fsync at error.
> > I also then noticed that we don't fsync the output file in cases of errors -
> > that seems wrong to me? Looks to me like that block should be moved till
> > after
> > the error:?
>
> How about something like the attached patch?
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c
b/src/bin/pg_basebackup/pg_recvlogical.c
index 5f2e6af445..e33c204df0 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -55,6 +55,7 @@ static const char *plugin = "test_decoding";
/* Global State */
static int outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t interrupted = false;
static volatile sig_atomic_t output_reopen = false;
static bool output_isfile;
static TimestampTz output_last_fsync = -1;
@@ -206,6 +207,7 @@ StreamLogicalLog(void)
char *copybuf = NULL;
TimestampTz last_status = -1;
int i;
+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
PQExpBuffer query;
output_written_lsn = InvalidXLogRecPtr;
@@ -275,7 +277,6 @@ StreamLogicalLog(void)
int bytes_written;
TimestampTz now;
int hdr_len;
- XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
@@ -487,7 +488,7 @@ StreamLogicalLog(void)
if (endposReached)
{
- prepareToTerminate(conn, endpos, true,
InvalidXLogRecPtr);
+ cur_record_lsn = InvalidXLogRecPtr;
time_to_abort = true;
break;
}
@@ -527,7 +528,6 @@ StreamLogicalLog(void)
*/
if (!flushAndSendFeedback(conn, &now))
goto error;
- prepareToTerminate(conn, endpos, false, cur_record_lsn);
time_to_abort = true;
break;
}
@@ -572,12 +572,14 @@ StreamLogicalLog(void)
/* endpos was exactly the record we just processed,
we're done */
if (!flushAndSendFeedback(conn, &now))
goto error;
- prepareToTerminate(conn, endpos, false, cur_record_lsn);
time_to_abort = true;
break;
}
}
+ if (time_to_abort)
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_COPY_OUT)
{
@@ -657,6 +659,7 @@ static void
sigexit_handler(SIGNAL_ARGS)
{
time_to_abort = true;
+ interrupted = true;
}
/*
@@ -1031,6 +1034,8 @@ prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool
keepalive, XLogRecPtr l
if (keepalive)
pg_log_info("end position %X/%X reached by keepalive",
LSN_FORMAT_ARGS(endpos));
+ else if (interrupted)
+ pg_log_info("interrupted after %X/%X",
LSN_FORMAT_ARGS(lsn));
else
pg_log_info("end position %X/%X reached by WAL record
at %X/%X",
LSN_FORMAT_ARGS(endpos),
LSN_FORMAT_ARGS(lsn));