Hi,

On 2016-03-14 20:10:58 -0300, Alvaro Herrera wrote:
> diff --git a/src/backend/access/transam/xlogreader.c 
> b/src/backend/access/transam/xlogreader.c
> index fcb0872..7b60b8f 100644
> --- a/src/backend/access/transam/xlogreader.c
> +++ b/src/backend/access/transam/xlogreader.c
> @@ -10,9 +10,11 @@
>   *
>   * NOTES
>   *           See xlogreader.h for more notes on this facility.
> + *
> + *           This file is compiled as both front-end and backend code, so it
> + *           may not use ereport, server-defined static variables, etc.
>   *-------------------------------------------------------------------------
>   */
> -

Huh?

>  #include "postgres.h"
>  
>  #include "access/transam.h"
> @@ -116,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void 
> *private_data)
>               return NULL;
>       }
>  
> +#ifndef FRONTEND
> +     /* Will be loaded on first read */
> +     state->timelineHistory = NIL;
> +#endif
> +
>       return state;
>  }
>  
> @@ -135,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
>       pfree(state->errormsg_buf);
>       if (state->readRecordBuf)
>               pfree(state->readRecordBuf);
> +#ifndef FRONTEND
> +     if (state->timelineHistory)
> +             list_free_deep(state->timelineHistory);
> +#endif

Hm. So we don't support timelines following for frontend code, although
it'd be rather helpful for pg_xlogdump. And possibly pg_rewind.


>       pfree(state->readBuf);
>       pfree(state);
>  }
> @@ -208,10 +219,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr 
> RecPtr, char **errormsg)
>  
>       if (RecPtr == InvalidXLogRecPtr)
>       {
> +             /* No explicit start point; read the record after the one we 
> just read */
>               RecPtr = state->EndRecPtr;
>  
>               if (state->ReadRecPtr == InvalidXLogRecPtr)
> -                     randAccess = true;
> +                     randAccess = true;      /* allow readPageTLI to go 
> backwards */

randAccess is doing more than that, so I'm doubtful that comment is an
improvment.


>               /*
>                * RecPtr is pointing to end+1 of the previous WAL record.  If 
> we're
> @@ -223,6 +235,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, 
> char **errormsg)
>       else
>       {
>               /*
> +              * Caller supplied a position to start at.
> +              *
>                * In this case, the passed-in record pointer should already be
>                * pointing to a valid record starting position.
>                */
> @@ -309,8 +323,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr 
> RecPtr, char **errormsg)
>               /* XXX: more validation should be done here */
>               if (total_len < SizeOfXLogRecord)
>               {
> -                     report_invalid_record(state, "invalid record length at 
> %X/%X",
> -                                                               (uint32) 
> (RecPtr >> 32), (uint32) RecPtr);
> +                     report_invalid_record(state,
> +                                             "invalid record length at 
> %X/%X: wanted %lu, got %u",
> +                                                               (uint32) 
> (RecPtr >> 32), (uint32) RecPtr,
> +                                                               
> SizeOfXLogRecord, total_len);
>                       goto err;
>               }
>               gotheader = false;
> @@ -466,9 +482,7 @@ err:
>        * Invalidate the xlog page we've cached. We might read from a different
>        * source after failure.
>        */
> -     state->readSegNo = 0;
> -     state->readOff = 0;
> -     state->readLen = 0;
> +     XLogReaderInvalCache(state);

I don't think that "cache" is the right way to describe this.


>  #include <unistd.h>
>  
> -#include "miscadmin.h"
> -

spurious change imo.



>  /*
> - * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
> - * we currently don't have the infrastructure (elog!) to share it.
> + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
> + * in timeline 'tli'.
> + *
> + * Will open, and keep open, one WAL segment stored in the static file
> + * descriptor 'sendFile'. This means if XLogRead is used once, there will
> + * always be one descriptor left open until the process ends, but never
> + * more than one.
> + *
> + * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
> + * in walsender.c but for small differences (such as lack of elog() in
> + * frontend).  Probably these should be merged at some point.
>   */
>  static void
>  XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
> @@ -648,8 +657,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, 
> Size count)
>       XLogRecPtr      recptr;
>       Size            nbytes;
>  
> +     /*
> +      * Cached state across calls.
> +      */

One line?


>       static int      sendFile = -1;
>       static XLogSegNo sendSegNo = 0;
> +     static TimeLineID sendTLI = 0;
>       static uint32 sendOff = 0;
>  
>       p = buf;
> @@ -664,11 +677,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr 
> startptr, Size count)
>  
>               startoff = recptr % XLogSegSize;
>  
> -             if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
> +             /* Do we need to open a new xlog segment? */
> +             if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
> +                     sendTLI != tli)
>               {

s/open a new/open a different/?  New imo has connotations that we don't
really want here.


>                       char            path[MAXPGPATH];
>  
> -                     /* Switch to another logfile segment */
>                       if (sendFile >= 0)
>                               close(sendFile);

E.g. you could just have moved the above comment.


>               /* Need to seek in the file? */
>               if (sendOff != startoff)
>               {
>                       if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
> -                     {
> -                             char            path[MAXPGPATH];
> -
> -                             XLogFilePath(path, tli, sendSegNo);
> -
>                               ereport(ERROR,
>                                               (errcode_for_file_access(),
>                                 errmsg("could not seek in log segment %s to 
> offset %u: %m",
> -                                              path, startoff)));
> -                     }
> +                                              XLogFileNameP(tli, sendSegNo), 
> startoff)));
>                       sendOff = startoff;
>               }

Not a serious issue, more a general remark: I'm doubtful that going for
palloc in error situations is good practice. This will be allocated in
the current memory context; without access to the emergency error
reserves.


I'm also getting the feeling that the patch is bordering on doing some
relatively random cleanups mixed in with architectural changes. Makes
things a bit harder to review.


> +static void
> +XLogReadDetermineTimeline(XLogReaderState *state)
> +{
> +     /* Read the history on first time through */
> +     if (state->timelineHistory == NIL)
> +             state->timelineHistory = readTimeLineHistory(ThisTimeLineID);
> +
> +     /*
> +      * Are we reading the record immediately following the one we read last
> +      * time?  If not, then don't use the cached timeline info.
> +      */
> +     if (state->currRecPtr != state->EndRecPtr)
> +     {
> +             state->currTLI = 0;
> +             state->currTLIValidUntil = InvalidXLogRecPtr;
> +     }


Hm. So we grow essentially a second version of the last end position and
the randAccess stuff in XLogReadRecord().


> +     if (state->currTLI == 0)
> +     {
> +             /*
> +              * Something changed; work out what timeline this record is on. 
> We
> +              * might read it from the segment on this TLI or, if the 
> segment is
> +              * also contained by newer timelines, the copy from a newer TLI.
> +              */
> +             state->currTLI = tliOfPointInHistory(state->currRecPtr,
> +                                                                             
>          state->timelineHistory);
> +
> +             /*
> +              * Look for the most recent timeline that's on the same xlog 
> segment
> +              * as this record, since that's the only one we can assume is 
> still
> +              * readable.
> +              */
> +             while (state->currTLI != ThisTimeLineID &&
> +                        state->currTLIValidUntil == InvalidXLogRecPtr)
> +             {
> +                     XLogRecPtr      tliSwitch;
> +                     TimeLineID      nextTLI;
> +
> +                     tliSwitch = tliSwitchPoint(state->currTLI, 
> state->timelineHistory,
> +                                                                        
> &nextTLI);
> +
> +                     /* round ValidUntil down to start of seg containing the 
> switch */
> +                     state->currTLIValidUntil =
> +                             ((tliSwitch / XLogSegSize) * XLogSegSize);
> +
> +                     if (state->currRecPtr >= state->currTLIValidUntil)
> +                     {
> +                             /*
> +                              * The new currTLI ends on this WAL segment so 
> check the next
> +                              * TLI to see if it's the last one on the 
> segment.
> +                              *
> +                              * If that's the current TLI we'll stop 
> searching.

I don't really understand how we're stopping searching here?

> +                              */
> +                             state->currTLI = nextTLI;
> +                             state->currTLIValidUntil = InvalidXLogRecPtr;
> +                     }
> +             }
> +}


XLogReadDetermineTimeline() doesn't sit quite right with me, I do wonder
whether there's not a simpler way to write this.


> +/*
> + * XLogPageReadCB callback for reading local xlog files
>   *
>   * Public because it would likely be very helpful for someone writing another
>   * output method outside walsender, e.g. in a bgworker.
>   *
> - * TODO: The walsender has it's own version of this, but it relies on the
> + * TODO: The walsender has its own version of this, but it relies on the
>   * walsender's latch being set whenever WAL is flushed. No such 
> infrastructure
>   * exists for normal backends, so we have to do a check/sleep/repeat style of
>   * loop for now.
> @@ -754,46 +897,88 @@ int
>  read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
>       int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID 
> *pageTLI)
>  {
> -     XLogRecPtr      flushptr,
> +     XLogRecPtr      read_upto,
>                               loc;
>       int                     count;
>  
>       loc = targetPagePtr + reqLen;
> +
> +     /* Make sure enough xlog is available... */
>       while (1)
>       {
>               /*
> -              * TODO: we're going to have to do something more intelligent 
> about
> -              * timelines on standbys. Use readTimeLineHistory() and
> -              * tliOfPointInHistory() to get the proper LSN? For now we'll 
> catch
> -              * that case earlier, but the code and TODO is left in here for 
> when
> -              * that changes.
> +              * Check which timeline to get the record from.
> +              *
> +              * We have to do it each time through the loop because if we're 
> in
> +              * recovery as a cascading standby, the current timeline 
> might've
> +              * become historical.
>                */
> -             if (!RecoveryInProgress())
> +             XLogReadDetermineTimeline(state);
> +
> +             if (state->currTLI == ThisTimeLineID)
>               {
> -                     *pageTLI = ThisTimeLineID;
> -                     flushptr = GetFlushRecPtr();
> +                     /*
> +                      * We're reading from the current timeline so we might 
> have to
> +                      * wait for the desired record to be generated (or, for 
> a standby,
> +                      * received & replayed)
> +                      */
> +                     if (!RecoveryInProgress())
> +                     {
> +                             *pageTLI = ThisTimeLineID;
> +                             read_upto = GetFlushRecPtr();
> +                     }
> +                     else
> +                             read_upto = GetXLogReplayRecPtr(pageTLI);
> +
> +                     if (loc <= read_upto)
> +                             break;
> +
> +                     CHECK_FOR_INTERRUPTS();
> +                     pg_usleep(1000L);
>               }
>               else
> -                     flushptr = GetXLogReplayRecPtr(pageTLI);
> +             {
> +                     /*
> +                      * We're on a historical timeline, so limit reading to 
> the switch
> +                      * point where we moved to the next timeline.
> +                      */
> +                     read_upto = state->currTLIValidUntil;

Hm. Is it ok to not check GetFlushRecPtr/GetXLogReplayRecPtr() here? If
so, how come?

> -             if (loc <= flushptr)
> +                     /*
> +                      * Setting pageTLI to our wanted record's TLI is 
> slightly wrong;
> +                      * the page might begin on an older timeline if it 
> contains a
> +                      * timeline switch, since its xlog segment will have 
> been copied
> +                      * from the prior timeline. This is pretty harmless 
> though, as
> +                      * nothing cares so long as the timeline doesn't go 
> backwards.  We
> +                      * should read the page header instead; FIXME someday.
> +                      */
> +                     *pageTLI = state->currTLI;
> +
> +                     /* No need to wait on a historical timeline */
>                       break;
> -
> -             CHECK_FOR_INTERRUPTS();
> -             pg_usleep(1000L);
> +             }
>       }
>  
> -     /* more than one block available */
> -     if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
> +     if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
> +     {
> +             /*
> +              * more than one block available; read only that block, have 
> caller
> +              * come back if they need more.
> +              */
>               count = XLOG_BLCKSZ;
> -     /* not enough data there */
> -     else if (targetPagePtr + reqLen > flushptr)
> +     }
> +     else if (targetPagePtr + reqLen > read_upto)
> +     {
> +             /* not enough data there */
>               return -1;
> -     /* part of the page available */
> +     }
>       else
> -             count = flushptr - targetPagePtr;
> +     {
> +             /* enough bytes available to satisfy the request */
> +             count = read_upto - targetPagePtr;
> +     }
>  
> -     XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
> +     XLogRead(cur_page, *pageTLI, targetPagePtr, count);

When are we reading less than a page? That should afaik never be required.


> +             /*
> +              * We start reading xlog from the restart lsn, even though in
> +              * CreateDecodingContext we set the snapshot builder up using 
> the
> +              * slot's candidate_restart_lsn. This means we might read xlog 
> we
> +              * don't actually decode rows from, but the snapshot builder 
> might
> +              * need it to get to a consistent point. The point we start 
> returning
> +              * data to *users* at is the candidate restart lsn from the 
> decoding
> +              * context.
> +              */

Uh? Where are we using candidate_restart_lsn that way? I seriously doubt
it is - candidate_restart_lsn is about a potential future restart_lsn,
which we can set once we get reception confirmation from the client.


> @@ -299,6 +312,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo 
> fcinfo, bool confirm, bool bin
>                       CHECK_FOR_INTERRUPTS();
>               }
>  
> +             /* Make sure timeline lookups use the start of the next record 
> */
> +             startptr = ctx->reader->EndRecPtr;

Huh? startptr isn't used after this, so I'm not sure what this even
means?

> +             /*
> +              * The XLogReader will read a page past the valid end of WAL 
> because
> +              * it doesn't know about timelines. When we switch timelines 
> and ask
> +              * it for the first page on the new timeline it will think it 
> has it
> +              * cached, but it'll have the old partial page and say it can't 
> find
> +              * the next record. So flush the cache.
> +              */
> +             XLogReaderInvalCache(ctx->reader);
> +

dito.


> diff --git a/src/test/modules/decoding_failover/decoding_failover.c 
> b/src/test/modules/decoding_failover/decoding_failover.c
> new file mode 100644
> index 0000000..669e6c4
> --- /dev/null
> +++ b/src/test/modules/decoding_failover/decoding_failover.c

> +
> +/*
> + * Create a new logical slot, with invalid LSN and xid, directly. This does 
> not
> + * use the snapshot builder or logical decoding machinery. It's only intended
> + * for creating a slot on a replica that mirrors the state of a slot on an
> + * upstream master.
> + *
> + * You should immediately decoding_failover_advance_logical_slot(...) it
> + * after creation.
> + */

Uh. I doubt we want this, even if it's formally located in
src/test/modules.  These comments make it appear not to be only intended
for that, and I have serious doubts about the validity of the concept as
is.


This seems to need some more polishing.


Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to