Hi,
On 2016-03-14 20:10:58 -0300, Alvaro Herrera wrote: > diff --git a/src/backend/access/transam/xlogreader.c > b/src/backend/access/transam/xlogreader.c > index fcb0872..7b60b8f 100644 > --- a/src/backend/access/transam/xlogreader.c > +++ b/src/backend/access/transam/xlogreader.c > @@ -10,9 +10,11 @@ > * > * NOTES > * See xlogreader.h for more notes on this facility. > + * > + * This file is compiled as both front-end and backend code, so it > + * may not use ereport, server-defined static variables, etc. > *------------------------------------------------------------------------- > */ > - Huh? > #include "postgres.h" > > #include "access/transam.h" > @@ -116,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void > *private_data) > return NULL; > } > > +#ifndef FRONTEND > + /* Will be loaded on first read */ > + state->timelineHistory = NIL; > +#endif > + > return state; > } > > @@ -135,6 +142,10 @@ XLogReaderFree(XLogReaderState *state) > pfree(state->errormsg_buf); > if (state->readRecordBuf) > pfree(state->readRecordBuf); > +#ifndef FRONTEND > + if (state->timelineHistory) > + list_free_deep(state->timelineHistory); > +#endif Hm. So we don't support timelines following for frontend code, although it'd be rather helpful for pg_xlogdump. And possibly pg_rewind. > pfree(state->readBuf); > pfree(state); > } > @@ -208,10 +219,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr > RecPtr, char **errormsg) > > if (RecPtr == InvalidXLogRecPtr) > { > + /* No explicit start point; read the record after the one we > just read */ > RecPtr = state->EndRecPtr; > > if (state->ReadRecPtr == InvalidXLogRecPtr) > - randAccess = true; > + randAccess = true; /* allow readPageTLI to go > backwards */ randAccess is doing more than that, so I'm doubtful that comment is an improvment. > /* > * RecPtr is pointing to end+1 of the previous WAL record. If > we're > @@ -223,6 +235,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, > char **errormsg) > else > { > /* > + * Caller supplied a position to start at. > + * > * In this case, the passed-in record pointer should already be > * pointing to a valid record starting position. > */ > @@ -309,8 +323,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr > RecPtr, char **errormsg) > /* XXX: more validation should be done here */ > if (total_len < SizeOfXLogRecord) > { > - report_invalid_record(state, "invalid record length at > %X/%X", > - (uint32) > (RecPtr >> 32), (uint32) RecPtr); > + report_invalid_record(state, > + "invalid record length at > %X/%X: wanted %lu, got %u", > + (uint32) > (RecPtr >> 32), (uint32) RecPtr, > + > SizeOfXLogRecord, total_len); > goto err; > } > gotheader = false; > @@ -466,9 +482,7 @@ err: > * Invalidate the xlog page we've cached. We might read from a different > * source after failure. > */ > - state->readSegNo = 0; > - state->readOff = 0; > - state->readLen = 0; > + XLogReaderInvalCache(state); I don't think that "cache" is the right way to describe this. > #include <unistd.h> > > -#include "miscadmin.h" > - spurious change imo. > /* > - * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but > - * we currently don't have the infrastructure (elog!) to share it. > + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' > + * in timeline 'tli'. > + * > + * Will open, and keep open, one WAL segment stored in the static file > + * descriptor 'sendFile'. This means if XLogRead is used once, there will > + * always be one descriptor left open until the process ends, but never > + * more than one. > + * > + * XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead > + * in walsender.c but for small differences (such as lack of elog() in > + * frontend). Probably these should be merged at some point. > */ > static void > XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count) > @@ -648,8 +657,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, > Size count) > XLogRecPtr recptr; > Size nbytes; > > + /* > + * Cached state across calls. > + */ One line? > static int sendFile = -1; > static XLogSegNo sendSegNo = 0; > + static TimeLineID sendTLI = 0; > static uint32 sendOff = 0; > > p = buf; > @@ -664,11 +677,12 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr > startptr, Size count) > > startoff = recptr % XLogSegSize; > > - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo)) > + /* Do we need to open a new xlog segment? */ > + if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || > + sendTLI != tli) > { s/open a new/open a different/? New imo has connotations that we don't really want here. > char path[MAXPGPATH]; > > - /* Switch to another logfile segment */ > if (sendFile >= 0) > close(sendFile); E.g. you could just have moved the above comment. > /* Need to seek in the file? */ > if (sendOff != startoff) > { > if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) > - { > - char path[MAXPGPATH]; > - > - XLogFilePath(path, tli, sendSegNo); > - > ereport(ERROR, > (errcode_for_file_access(), > errmsg("could not seek in log segment %s to > offset %u: %m", > - path, startoff))); > - } > + XLogFileNameP(tli, sendSegNo), > startoff))); > sendOff = startoff; > } Not a serious issue, more a general remark: I'm doubtful that going for palloc in error situations is good practice. This will be allocated in the current memory context; without access to the emergency error reserves. I'm also getting the feeling that the patch is bordering on doing some relatively random cleanups mixed in with architectural changes. Makes things a bit harder to review. > +static void > +XLogReadDetermineTimeline(XLogReaderState *state) > +{ > + /* Read the history on first time through */ > + if (state->timelineHistory == NIL) > + state->timelineHistory = readTimeLineHistory(ThisTimeLineID); > + > + /* > + * Are we reading the record immediately following the one we read last > + * time? If not, then don't use the cached timeline info. > + */ > + if (state->currRecPtr != state->EndRecPtr) > + { > + state->currTLI = 0; > + state->currTLIValidUntil = InvalidXLogRecPtr; > + } Hm. So we grow essentially a second version of the last end position and the randAccess stuff in XLogReadRecord(). > + if (state->currTLI == 0) > + { > + /* > + * Something changed; work out what timeline this record is on. > We > + * might read it from the segment on this TLI or, if the > segment is > + * also contained by newer timelines, the copy from a newer TLI. > + */ > + state->currTLI = tliOfPointInHistory(state->currRecPtr, > + > state->timelineHistory); > + > + /* > + * Look for the most recent timeline that's on the same xlog > segment > + * as this record, since that's the only one we can assume is > still > + * readable. > + */ > + while (state->currTLI != ThisTimeLineID && > + state->currTLIValidUntil == InvalidXLogRecPtr) > + { > + XLogRecPtr tliSwitch; > + TimeLineID nextTLI; > + > + tliSwitch = tliSwitchPoint(state->currTLI, > state->timelineHistory, > + > &nextTLI); > + > + /* round ValidUntil down to start of seg containing the > switch */ > + state->currTLIValidUntil = > + ((tliSwitch / XLogSegSize) * XLogSegSize); > + > + if (state->currRecPtr >= state->currTLIValidUntil) > + { > + /* > + * The new currTLI ends on this WAL segment so > check the next > + * TLI to see if it's the last one on the > segment. > + * > + * If that's the current TLI we'll stop > searching. I don't really understand how we're stopping searching here? > + */ > + state->currTLI = nextTLI; > + state->currTLIValidUntil = InvalidXLogRecPtr; > + } > + } > +} XLogReadDetermineTimeline() doesn't sit quite right with me, I do wonder whether there's not a simpler way to write this. > +/* > + * XLogPageReadCB callback for reading local xlog files > * > * Public because it would likely be very helpful for someone writing another > * output method outside walsender, e.g. in a bgworker. > * > - * TODO: The walsender has it's own version of this, but it relies on the > + * TODO: The walsender has its own version of this, but it relies on the > * walsender's latch being set whenever WAL is flushed. No such > infrastructure > * exists for normal backends, so we have to do a check/sleep/repeat style of > * loop for now. > @@ -754,46 +897,88 @@ int > read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, > int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID > *pageTLI) > { > - XLogRecPtr flushptr, > + XLogRecPtr read_upto, > loc; > int count; > > loc = targetPagePtr + reqLen; > + > + /* Make sure enough xlog is available... */ > while (1) > { > /* > - * TODO: we're going to have to do something more intelligent > about > - * timelines on standbys. Use readTimeLineHistory() and > - * tliOfPointInHistory() to get the proper LSN? For now we'll > catch > - * that case earlier, but the code and TODO is left in here for > when > - * that changes. > + * Check which timeline to get the record from. > + * > + * We have to do it each time through the loop because if we're > in > + * recovery as a cascading standby, the current timeline > might've > + * become historical. > */ > - if (!RecoveryInProgress()) > + XLogReadDetermineTimeline(state); > + > + if (state->currTLI == ThisTimeLineID) > { > - *pageTLI = ThisTimeLineID; > - flushptr = GetFlushRecPtr(); > + /* > + * We're reading from the current timeline so we might > have to > + * wait for the desired record to be generated (or, for > a standby, > + * received & replayed) > + */ > + if (!RecoveryInProgress()) > + { > + *pageTLI = ThisTimeLineID; > + read_upto = GetFlushRecPtr(); > + } > + else > + read_upto = GetXLogReplayRecPtr(pageTLI); > + > + if (loc <= read_upto) > + break; > + > + CHECK_FOR_INTERRUPTS(); > + pg_usleep(1000L); > } > else > - flushptr = GetXLogReplayRecPtr(pageTLI); > + { > + /* > + * We're on a historical timeline, so limit reading to > the switch > + * point where we moved to the next timeline. > + */ > + read_upto = state->currTLIValidUntil; Hm. Is it ok to not check GetFlushRecPtr/GetXLogReplayRecPtr() here? If so, how come? > - if (loc <= flushptr) > + /* > + * Setting pageTLI to our wanted record's TLI is > slightly wrong; > + * the page might begin on an older timeline if it > contains a > + * timeline switch, since its xlog segment will have > been copied > + * from the prior timeline. This is pretty harmless > though, as > + * nothing cares so long as the timeline doesn't go > backwards. We > + * should read the page header instead; FIXME someday. > + */ > + *pageTLI = state->currTLI; > + > + /* No need to wait on a historical timeline */ > break; > - > - CHECK_FOR_INTERRUPTS(); > - pg_usleep(1000L); > + } > } > > - /* more than one block available */ > - if (targetPagePtr + XLOG_BLCKSZ <= flushptr) > + if (targetPagePtr + XLOG_BLCKSZ <= read_upto) > + { > + /* > + * more than one block available; read only that block, have > caller > + * come back if they need more. > + */ > count = XLOG_BLCKSZ; > - /* not enough data there */ > - else if (targetPagePtr + reqLen > flushptr) > + } > + else if (targetPagePtr + reqLen > read_upto) > + { > + /* not enough data there */ > return -1; > - /* part of the page available */ > + } > else > - count = flushptr - targetPagePtr; > + { > + /* enough bytes available to satisfy the request */ > + count = read_upto - targetPagePtr; > + } > > - XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ); > + XLogRead(cur_page, *pageTLI, targetPagePtr, count); When are we reading less than a page? That should afaik never be required. > + /* > + * We start reading xlog from the restart lsn, even though in > + * CreateDecodingContext we set the snapshot builder up using > the > + * slot's candidate_restart_lsn. This means we might read xlog > we > + * don't actually decode rows from, but the snapshot builder > might > + * need it to get to a consistent point. The point we start > returning > + * data to *users* at is the candidate restart lsn from the > decoding > + * context. > + */ Uh? Where are we using candidate_restart_lsn that way? I seriously doubt it is - candidate_restart_lsn is about a potential future restart_lsn, which we can set once we get reception confirmation from the client. > @@ -299,6 +312,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo > fcinfo, bool confirm, bool bin > CHECK_FOR_INTERRUPTS(); > } > > + /* Make sure timeline lookups use the start of the next record > */ > + startptr = ctx->reader->EndRecPtr; Huh? startptr isn't used after this, so I'm not sure what this even means? > + /* > + * The XLogReader will read a page past the valid end of WAL > because > + * it doesn't know about timelines. When we switch timelines > and ask > + * it for the first page on the new timeline it will think it > has it > + * cached, but it'll have the old partial page and say it can't > find > + * the next record. So flush the cache. > + */ > + XLogReaderInvalCache(ctx->reader); > + dito. > diff --git a/src/test/modules/decoding_failover/decoding_failover.c > b/src/test/modules/decoding_failover/decoding_failover.c > new file mode 100644 > index 0000000..669e6c4 > --- /dev/null > +++ b/src/test/modules/decoding_failover/decoding_failover.c > + > +/* > + * Create a new logical slot, with invalid LSN and xid, directly. This does > not > + * use the snapshot builder or logical decoding machinery. It's only intended > + * for creating a slot on a replica that mirrors the state of a slot on an > + * upstream master. > + * > + * You should immediately decoding_failover_advance_logical_slot(...) it > + * after creation. > + */ Uh. I doubt we want this, even if it's formally located in src/test/modules. These comments make it appear not to be only intended for that, and I have serious doubts about the validity of the concept as is. This seems to need some more polishing. Greetings, Andres Freund -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers