At Thu, 17 May 2018 13:54:07 +0300, Arseny Sher <[email protected]> wrote
in <87in7md034.fsf@ars-thinkpad>
>
> Konstantin Knizhnik <[email protected]> writes:
>
> > I think that using restart_lsn instead of confirmed_flush is not right
> > approach.
> > If restart_lsn is not available and confirmed_flush is pointing to page
> > boundary, then in any case we should somehow handle this case and adjust
> > startlsn to point on the valid record position (by jjust adding page header
> > size?).
>
> Well, restart_lsn is always available on live slot: it is initially set
> in ReplicationSlotReserveWal during slot creation.
restart_lsn stays at the beginning of a transaction until the
transaction ends so just using restart_lsn allows repeated
decoding of a transaction, in short, rewinding occurs. The
function works only for inactive slot so the current code works
fine on this point. Addition to that restart_lsn also can be on a
page bounary.
We can see the problem easily.
1. Just create a logical replication slot with setting current LSN.
select pg_create_logical_replication_slot('s1', 'pgoutput');
2. Advance LSN by two or three pages by doing anyting.
3. Advance the slot to a page bounadry.
e.g. select pg_replication_slot_advance('s1', '0/9624000');
4. advance the slot further, then crash.
So directly set ctx->reader->EndRecPtr by startlsn fixes the
problem, but I found another problem here.
The function accepts any LSN even if it is not at the begiining
of a record. We will see errors or crashs or infinite waiting or
maybe any kind of trouble by such values. The moved LSN must
always be at the "end of a record" (that is, at the start of the
next recored). The attached patch also fixes this.
The documentation doesn't look requiring a fix.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..d3cb777f9f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,6 +318,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
/*
* Helper function for advancing physical replication slot forward.
+ *
+ * This function accepts arbitrary LSN even if the LSN is not at the beginning
+ * of a record. This can lead to any kind of misbehavior but currently the
+ * value is used only to determine up to what wal segment to keep and
+ * successive implicit advancing fixes the state.
*/
static XLogRecPtr
pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
@@ -344,6 +349,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
XLogRecPtr retlsn = InvalidXLogRecPtr;
+ XLogRecPtr upto;
PG_TRY();
{
@@ -354,6 +360,13 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
logical_read_local_xlog_page,
NULL, NULL, NULL);
+ /*
+ * startlsn can be on page boundary but it is not accepted as explicit
+ * parameter to XLogReadRecord. Set it in reader context.
+ */
+ Assert(startlsn != InvalidXLogRecPtr);
+ upto = ctx->reader->EndRecPtr = startlsn;
+
CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
"logical decoding");
@@ -361,22 +374,18 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
InvalidateSystemCaches();
/* Decode until we run out of records */
- while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
- (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto))
+ while (ctx->reader->EndRecPtr <= moveto)
{
XLogRecord *record;
char *errm = NULL;
+
+ /* ctx->reader->EndRecPtr cannot be go backward here */
+ upto = ctx->reader->EndRecPtr;
- record = XLogReadRecord(ctx->reader, startlsn, &errm);
+ record = XLogReadRecord(ctx->reader, InvalidXLogRecPtr, &errm);
if (errm)
elog(ERROR, "%s", errm);
- /*
- * Now that we've set up the xlog reader state, subsequent calls
- * pass InvalidXLogRecPtr to say "continue from last record"
- */
- startlsn = InvalidXLogRecPtr;
-
/*
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
* store the description into our tuplestore.
@@ -384,18 +393,14 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
if (record != NULL)
LogicalDecodingProcessRecord(ctx, ctx->reader);
- /* check limits */
- if (moveto <= ctx->reader->EndRecPtr)
- break;
-
CHECK_FOR_INTERRUPTS();
}
CurrentResourceOwner = old_resowner;
- if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+ if (startlsn != upto)
{
- LogicalConfirmReceivedLocation(moveto);
+ LogicalConfirmReceivedLocation(upto);
/*
* If only the confirmed_flush_lsn has changed the slot won't get