Alvaro Herrera <alvhe...@2ndquadrant.com> wrote: > I was confused by the struct name XLogSegment -- the struct is used to > represent a WAL segment while it's kept open, rather than just a WAL > segment in abstract. Also, now that we've renamed everything to use the > term WAL, it seems wrong to use the name XLog for new structs. I > propose the name WALOpenSegment for the struct, which solves both > problems. (Its initializer function would get the name > WALOpenSegmentInit.) > > Now, the patch introduces a callback for XLogRead, the type of which is > called XLogOpenSegment. If we rename it from XLog to WAL, both names > end up the same. I propose to rename the function type to > WALSegmentOpen, which in a "noun-verb" view of the world, represents the > action of opening a WAL segment. > > I attach a patch for all this renaming, on top of your series.
ok, thanks. In addition I renamed WalSndOpenSegment() to WalSndSegmentOpen() and read_local_xlog_page_open_segment() to read_local_xlog_page_segment_open() > I wonder if each of those WALSegmentOpen callbacks should reset [at > least some members of] the struct; they're already in charge of setting > ->file, and apparently we're leaving the responsibility of setting the > rest of the members to XLogRead. That seems weird. Maybe we should say > that the CB should only open the segment and not touch the struct at all > and XLogRead is in charge of everything. Perhaps the other way around > -- the CB should set everything correctly ... I'm not sure which is > best. But having half here and half there seems a recipe for confusion > and bugs. ok, I've changed the CB signature. Now it receives poiners to the two variables that it can change while the "seg" argument is documented as read-only. To indicate that the CB should determine timeline itself, I introduced a new constant InvalidTimeLineID, see the 0004 part. > Another thing I didn't like much is that everything seems to assume that > the only error possible from XLogRead is a read error. Maybe that's > okay, because it seems to be the current reality, but it seemed odd. In this case I only moved the ereport() code from XLogRead() away (so that both backend and frontend can call the function). Given that the code to open WAL segment is now in the callbacks, the only thing that XLogRead() can ereport is that read() failed. BTW, I introduced one more structure, XLogReadError, in this patch version. I think it's better than adding error-specific fields to the WALOpenSegment structure. -- Antonin Houska Web: https://www.cybertec-postgresql.com
>From 674fa97ef9df8b1fe875139aa7b43f605255d8cd Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Mon, 23 Sep 2019 07:40:49 +0200 Subject: [PATCH 2/6] Remove TLI from some argument lists. The timeline information is available to caller via XLogReaderState. Now that XLogRead() is gonna be (sometimes) responsible for determining the TLI, it would have to be added the (TimeLineID *) argument too, just to be consistent with the current coding style. Since XLogRead() updates also other position-specific fields of XLogReaderState, it seems simpler if we remove the output argument from XLogPageReadCB and always report the TLI via XLogReaderState. --- src/backend/access/transam/xlog.c | 7 +++---- src/backend/access/transam/xlogreader.c | 6 +++--- src/backend/access/transam/xlogutils.c | 11 ++++++----- src/backend/replication/logical/logicalfuncs.c | 4 ++-- src/backend/replication/walsender.c | 2 +- src/bin/pg_rewind/parsexlog.c | 8 +++----- src/bin/pg_waldump/pg_waldump.c | 2 +- src/include/access/xlogreader.h | 8 +++----- src/include/access/xlogutils.h | 5 ++--- src/include/replication/logicalfuncs.h | 2 +- 10 files changed, 25 insertions(+), 30 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b7ff004234..7a89dfed7f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -885,8 +885,7 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *readTLI); + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); @@ -11523,7 +11522,7 @@ CancelBackup(void) */ static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI) + XLogRecPtr targetRecPtr, char *readBuf) { XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; @@ -11640,7 +11639,7 @@ retry: Assert(targetPageOff == readOff); Assert(reqLen <= readLen); - *readTLI = curFileTLI; + xlogreader->readPageTLI = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index a66e3324b1..2184f4291d 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -559,7 +559,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; @@ -577,7 +577,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) */ readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; @@ -596,7 +596,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, - state->readBuf, &state->readPageTLI); + state->readBuf); if (readLen < 0) goto err; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 1fc39333f1..680bed8278 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -909,12 +909,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa */ int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr read_upto, loc; int count; + TimeLineID pageTLI; loc = targetPagePtr + reqLen; @@ -934,7 +934,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - *pageTLI = ThisTimeLineID; + pageTLI = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -991,7 +991,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * nothing cares so long as the timeline doesn't go backwards. We * should read the page header instead; FIXME someday. */ - *pageTLI = state->currTLI; + pageTLI = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1022,8 +1022,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr, + XLogRead(cur_page, state->wal_segment_size, pageTLI, targetPagePtr, XLOG_BLCKSZ); + state->readPageTLI = pageTLI; /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index d974400d6e..d1cf80d441 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -116,10 +116,10 @@ check_permissions(void) int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { return read_local_xlog_page(state, targetPagePtr, reqLen, - targetRecPtr, cur_page, pageTLI); + targetRecPtr, cur_page); } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 23870a25a5..28d8c31af8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -763,7 +763,7 @@ StartReplication(StartReplicationCmd *cmd) */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) + XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr flushptr; int count; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 63c3879ead..33e2ba2a03 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -49,8 +49,7 @@ typedef struct XLogPageReadPrivate static int SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *pageTLI); + int reqLen, XLogRecPtr targetRecPtr, char *readBuf); /* * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline @@ -237,8 +236,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, /* XLogReader callback function, to read a WAL page */ static int SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf, - TimeLineID *pageTLI) + int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; uint32 targetPageOff; @@ -321,7 +319,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, Assert(targetSegNo == xlogreadsegno); - *pageTLI = targetHistory[private->tliIndex].tli; + xlogreader->readPageTLI = targetHistory[private->tliIndex].tli; return XLOG_BLCKSZ; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index b95d467805..40c64a0bbf 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -423,7 +423,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, */ static int XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI) + XLogRecPtr targetPtr, char *readBuff) { XLogDumpPrivate *private = state->private_data; int count = XLOG_BLCKSZ; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 735b1bd2fd..d64a9ad82f 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -38,8 +38,7 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, - char *readBuf, - TimeLineID *pageTLI); + char *readBuf); typedef struct { @@ -99,9 +98,8 @@ struct XLogReaderState * actual WAL record it's interested in. In that case, targetRecPtr can * be used to determine which timeline to read the page from. * - * The callback shall set *pageTLI to the TLI of the file the page was - * read from. It is currently used only for error reporting purposes, to - * reconstruct the name of the WAL file where an error occurred. + * The callback shall set ->readPageTLI to the TLI of the file the page + * was read from. */ XLogPageReadCB read_page; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 4105b59904..4fb305bafd 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,12 +47,11 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page, - TimeLineID *pageTLI); + XLogRecPtr targetRecPtr, char *cur_page); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); - #endif diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index a9c178a9e6..012096f183 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -14,6 +14,6 @@ extern int logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, - char *cur_page, TimeLineID *pageTLI); + char *cur_page); #endif -- 2.20.1
>From 0022a47048ee65b04b2a3f5f53715e04459baa83 Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Mon, 23 Sep 2019 07:40:49 +0200 Subject: [PATCH 3/6] Introduce WALOpenSegment structure. --- src/backend/access/transam/xlog.c | 6 +- src/backend/access/transam/xlogreader.c | 59 +++++++++++------- src/backend/access/transam/xlogutils.c | 20 +++---- src/backend/replication/walsender.c | 79 ++++++++++++------------- src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/pg_waldump.c | 3 + src/include/access/xlogreader.h | 34 +++++++---- 7 files changed, 116 insertions(+), 87 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7a89dfed7f..031733aba8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4295,7 +4295,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size); offset = XLogSegmentOffset(xlogreader->latestPagePtr, wal_segment_size); - XLogFileName(fname, xlogreader->readPageTLI, segno, + XLogFileName(fname, xlogreader->seg.tli, segno, wal_segment_size); ereport(emode_for_corrupt_record(emode, RecPtr ? RecPtr : EndRecPtr), @@ -7354,7 +7354,7 @@ StartupXLOG(void) * and we were reading the old WAL from a segment belonging to a higher * timeline. */ - EndOfLogTLI = xlogreader->readPageTLI; + EndOfLogTLI = xlogreader->seg.tli; /* * Complain if we did not roll forward far enough to render the backup @@ -11639,7 +11639,7 @@ retry: Assert(targetPageOff == readOff); Assert(reqLen <= readLen); - xlogreader->readPageTLI = curFileTLI; + xlogreader->seg.tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 2184f4291d..4de5530b3e 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -96,7 +96,9 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, return NULL; } - state->wal_segment_size = wal_segment_size; + /* Initialize segment pointer. */ + WALOpenSegmentInit(&state->seg, wal_segment_size); + state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; @@ -490,8 +492,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->wal_segment_size - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size); + state->EndRecPtr += state->seg.size - 1; + state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->seg.size); } if (DecodeXLogRecord(state, record, errormsg)) @@ -533,12 +535,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) Assert((pageptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); - targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); + XLByteToSeg(pageptr, targetSegNo, state->seg.size); + targetPageOff = XLogSegmentOffset(pageptr, state->seg.size); /* check whether we have all the requested data already */ - if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && - reqLen <= state->readLen) + if (targetSegNo == state->seg.num && + targetPageOff == state->seg.off && reqLen <= state->readLen) return state->readLen; /* @@ -553,7 +555,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * record is. This is so that we can check the additional identification * info that is present in the first page's "long" header. */ - if (targetSegNo != state->readSegNo && targetPageOff != 0) + if (targetSegNo != state->seg.num && targetPageOff != 0) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; @@ -608,8 +610,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) goto err; /* update read state information */ - state->readSegNo = targetSegNo; - state->readOff = targetPageOff; + state->seg.num = targetSegNo; + state->seg.off = targetPageOff; state->readLen = readLen; return readLen; @@ -625,8 +627,8 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->readSegNo = 0; - state->readOff = 0; + state->seg.num = 0; + state->seg.off = 0; state->readLen = 0; } @@ -745,16 +747,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, Assert((recptr % XLOG_BLCKSZ) == 0); - XLByteToSeg(recptr, segno, state->wal_segment_size); - offset = XLogSegmentOffset(recptr, state->wal_segment_size); + XLByteToSeg(recptr, segno, state->seg.size); + offset = XLogSegmentOffset(recptr, state->seg.size); - XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr); + XLogSegNoOffsetToRecPtr(segno, offset, state->seg.size, recaddr); if (hdr->xlp_magic != XLOG_PAGE_MAGIC) { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "invalid magic number %04X in log segment %s, offset %u", @@ -768,7 +770,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "invalid info bits %04X in log segment %s, offset %u", @@ -791,7 +793,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, (unsigned long long) state->system_identifier); return false; } - else if (longhdr->xlp_seg_size != state->wal_segment_size) + else if (longhdr->xlp_seg_size != state->seg.size) { report_invalid_record(state, "WAL file is from different database system: incorrect segment size in page header"); @@ -808,7 +810,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, @@ -828,7 +830,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "unexpected pageaddr %X/%X in log segment %s, offset %u", @@ -853,7 +855,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, { char fname[MAXFNAMELEN]; - XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); + XLogFileName(fname, state->seg.tli, segno, state->seg.size); report_invalid_record(state, "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", @@ -997,6 +999,21 @@ out: #endif /* FRONTEND */ +/* + * Initialize the passed segment pointer. + */ +void +WALOpenSegmentInit(WALOpenSegment *seg, int size) +{ + seg->file = -1; + seg->num = 0; + seg->off = 0; + seg->tli = 0; + seg->size = size; +#ifdef FRONTEND + seg->dir = NULL; +#endif +} /* ---------------------------------------- * Functions for decoding the data and block references in a record. diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 680bed8278..424bb06919 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = state->readSegNo * - state->wal_segment_size + state->readOff; + const XLogRecPtr lastReadPage = state->seg.num * + state->seg.size + state->seg.off; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa if (state->currTLIValidUntil != InvalidXLogRecPtr && state->currTLI != ThisTimeLineID && state->currTLI != 0 && - ((wantPage + wantLength) / state->wal_segment_size) < - (state->currTLIValidUntil / state->wal_segment_size)) + ((wantPage + wantLength) / state->seg.size) < + (state->currTLIValidUntil / state->seg.size)) return; /* @@ -870,11 +870,11 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa */ List *timelineHistory = readTimeLineHistory(ThisTimeLineID); - XLogRecPtr endOfSegment = (((wantPage / state->wal_segment_size) + 1) - * state->wal_segment_size) - 1; + XLogRecPtr endOfSegment = (((wantPage / state->seg.size) + 1) + * state->seg.size) - 1; - Assert(wantPage / state->wal_segment_size == - endOfSegment / state->wal_segment_size); + Assert(wantPage / state->seg.size == + endOfSegment / state->seg.size); /* * Find the timeline of the last LSN on the segment containing @@ -1022,9 +1022,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->wal_segment_size, pageTLI, targetPagePtr, + XLogRead(cur_page, state->seg.size, state->seg.tli, targetPagePtr, XLOG_BLCKSZ); - state->readPageTLI = pageTLI; + state->seg.tli = pageTLI; /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 28d8c31af8..a617e20ab6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -128,16 +128,7 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -/* - * These variables are used similarly to openLogFile/SegNo/Off, - * but for walsender to read the XLOG. - */ -static int sendFile = -1; -static XLogSegNo sendSegNo = 0; -static uint32 sendOff = 0; - -/* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static WALOpenSegment *sendSeg = NULL; /* * These variables keep track of the state of the timeline we're currently @@ -285,6 +276,11 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); + + /* Make sure we can remember the current read position in XLOG. */ + sendSeg = (WALOpenSegment *) + MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); + WALOpenSegmentInit(sendSeg, wal_segment_size); } /* @@ -301,10 +297,10 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendFile >= 0) + if (sendSeg->file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->file); + sendSeg->file = -1; } if (MyReplicationSlot != NULL) @@ -2384,15 +2380,16 @@ retry: startoff = XLogSegmentOffset(recptr, wal_segment_size); - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size)) + if (sendSeg->file < 0 || + !XLByteInSeg(recptr, sendSeg->num, sendSeg->size)) { char path[MAXPGPATH]; /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); + if (sendSeg->file >= 0) + close(sendSeg->file); - XLByteToSeg(recptr, sendSegNo, wal_segment_size); + XLByteToSeg(recptr, sendSeg->num, sendSeg->size); /*------- * When reading from a historic timeline, and there is a timeline @@ -2420,20 +2417,20 @@ retry: * used portion of the old segment is copied to the new file. *------- */ - curFileTimeLine = sendTimeLine; + sendSeg->tli = sendTimeLine; if (sendTimeLineIsHistoric) { XLogSegNo endSegNo; XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); - if (sendSegNo == endSegNo) - curFileTimeLine = sendTimeLineNextTLI; + if (sendSeg->num == endSegNo) + sendSeg->tli = sendTimeLineNextTLI; } - XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size); + XLogFilePath(path, sendSeg->tli, sendSeg->num, wal_segment_size); - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendFile < 0) + sendSeg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (sendSeg->file < 0) { /* * If the file is not found, assume it's because the standby @@ -2444,26 +2441,26 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); + XLogFileNameP(sendSeg->tli, sendSeg->num)))); else ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); } - sendOff = 0; + sendSeg->off = 0; } /* Need to seek in the file? */ - if (sendOff != startoff) + if (sendSeg->off != startoff) { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + if (lseek(sendSeg->file, (off_t) startoff, SEEK_SET) < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), + XLogFileNameP(sendSeg->tli, sendSeg->num), startoff))); - sendOff = startoff; + sendSeg->off = startoff; } /* How many bytes are within this segment? */ @@ -2473,29 +2470,29 @@ retry: segbytes = nbytes; pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); + readbytes = read(sendSeg->file, p, segbytes); pgstat_report_wait_end(); if (readbytes < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (Size) segbytes))); + XLogFileNameP(sendSeg->tli, sendSeg->num), + sendSeg->off, (Size) segbytes))); } else if (readbytes == 0) { ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, readbytes, (Size) segbytes))); + XLogFileNameP(sendSeg->tli, sendSeg->num), + sendSeg->off, readbytes, (Size) segbytes))); } /* Update state for read */ recptr += readbytes; - sendOff += readbytes; + sendSeg->off += readbytes; nbytes -= readbytes; p += readbytes; } @@ -2526,10 +2523,10 @@ retry: walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); - if (reload && sendFile >= 0) + if (reload && sendSeg->file >= 0) { - close(sendFile); - sendFile = -1; + close(sendSeg->file); + sendSeg->file = -1; goto retry; } @@ -2695,9 +2692,9 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendFile >= 0) - close(sendFile); - sendFile = -1; + if (sendSeg->file >= 0) + close(sendSeg->file); + sendSeg->file = -1; /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 33e2ba2a03..6cea9342d6 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -319,7 +319,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, Assert(targetSegNo == xlogreadsegno); - xlogreader->readPageTLI = targetHistory[private->tliIndex].tli; + xlogreader->seg.tli = targetHistory[private->tliIndex].tli; return XLOG_BLCKSZ; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 40c64a0bbf..a16793bb8b 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -1105,6 +1105,9 @@ main(int argc, char **argv) if (!xlogreader_state) fatal_error("out of memory"); + /* Finalize the segment pointer. */ + xlogreader_state->seg.dir = private.inpath; + /* first find a valid recptr to start from */ first_record = XLogFindNextRecord(xlogreader_state, private.startptr); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d64a9ad82f..b9d99d524e 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -31,6 +31,22 @@ #include "access/xlogrecord.h" +/* + * WALOpenSegment represents a WAL segment being read. + */ +typedef struct WALOpenSegment +{ + int file; /* segment file descriptor */ + XLogSegNo num; /* segment number */ + uint32 off; /* offset in the segment */ + TimeLineID tli; /* timeline ID of the currently open file */ + int size; /* segment size */ + +#ifdef FRONTEND + char *dir; /* WAL directory */ +#endif +} WALOpenSegment; + typedef struct XLogReaderState XLogReaderState; /* Function type definition for the read_page callback */ @@ -76,11 +92,6 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Segment size of the to-be-parsed data (mandatory). - */ - int wal_segment_size; - /* * Data input callback (mandatory). * @@ -98,8 +109,8 @@ struct XLogReaderState * actual WAL record it's interested in. In that case, targetRecPtr can * be used to determine which timeline to read the page from. * - * The callback shall set ->readPageTLI to the TLI of the file the page - * was read from. + * The callback shall set ->seg.tli to the TLI of the file the page was + * read from. */ XLogPageReadCB read_page; @@ -154,10 +165,8 @@ struct XLogReaderState char *readBuf; uint32 readLen; - /* last read segment, segment offset, TLI for data currently in readBuf */ - XLogSegNo readSegNo; - uint32 readOff; - TimeLineID readPageTLI; + /* last read XLOG position for data currently in readBuf */ + WALOpenSegment seg; /* * beginning of prior page read, and its TLI. Doesn't necessarily @@ -217,6 +226,9 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ + +extern void WALOpenSegmentInit(WALOpenSegment *seg, int size); + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, -- 2.20.1
>From 29d7e8c46fddf2c95b6dc0ff702912be5516e881 Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Mon, 23 Sep 2019 07:40:49 +0200 Subject: [PATCH 4/6] Introduce InvalidTimeLineID. This is useful to tell the WALSegmentOpen() callback that it should determine TLI itself. We can't pass NULL pointer because the function should return the TLI via the argument in such a case. --- src/backend/access/transam/timeline.c | 10 ++++++++-- src/backend/access/transam/xlog.c | 1 + src/include/access/xlogdefs.h | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c index c2ba480c70..7cb6046835 100644 --- a/src/backend/access/transam/timeline.c +++ b/src/backend/access/transam/timeline.c @@ -254,6 +254,8 @@ findNewestTimeLine(TimeLineID startTLI) /* * The algorithm is just to probe for the existence of timeline history * files. XXX is it useful to allow gaps in the sequence? + * + * XXX Should we check whether all possible timelines are in use? */ newestTLI = startTLI; @@ -263,9 +265,13 @@ findNewestTimeLine(TimeLineID startTLI) { newestTLI = probeTLI; /* probeTLI exists */ } - else + else if (probeTLI != MaxTimeLineID) { - /* doesn't exist, assume we're done */ + /* + * Doesn't exist, assume we're done, but do not return + * MaxTimeLineID - if caller incremented that, it'd become + * InvalidTimeLineID. + */ break; } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 031733aba8..c9e01fe82d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7421,6 +7421,7 @@ StartupXLOG(void) Assert(InArchiveRecovery); ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1; + Assert(ThisTimeLineID != InvalidTimeLineID); ereport(LOG, (errmsg("selected new timeline ID: %u", ThisTimeLineID))); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index daded3dca0..459dc99d3e 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -51,6 +51,10 @@ typedef uint64 XLogSegNo; */ typedef uint32 TimeLineID; +#define InvalidTimeLineID ((TimeLineID) 0xFFFFFFFF) + +#define MaxTimeLineID ((TimeLineID) 0xFFFFFFFE) + /* * Replication origin id - this is located in this file to avoid having to * include origin.h in a bunch of xlog related places. -- 2.20.1
>From 0c6ae68ccab9507a7294ccd63c1b40561e889e9f Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Mon, 23 Sep 2019 07:40:49 +0200 Subject: [PATCH 5/6] Use only xlogreader.c:XLogRead() The implementations in xlogutils.c and walsender.c are just renamed now, to be removed by the following diff. --- src/backend/access/transam/xlogreader.c | 157 ++++++++++++++++++++++++ src/backend/access/transam/xlogutils.c | 46 ++++++- src/backend/replication/walsender.c | 139 ++++++++++++++++++++- src/bin/pg_waldump/pg_waldump.c | 64 +++++++++- src/include/access/xlogreader.h | 42 +++++++ 5 files changed, 436 insertions(+), 12 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 4de5530b3e..7f77fa95cb 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include <unistd.h> + #include "access/transam.h" #include "access/xlogrecord.h" #include "access/xlog_internal.h" @@ -27,6 +29,7 @@ #ifndef FRONTEND #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -1015,6 +1018,160 @@ WALOpenSegmentInit(WALOpenSegment *seg, int size) #endif } +/* + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If + * tli_p is passed, get the data from timeline *tli_p. 'pos' is the current + * position in the XLOG file and openSegment is a callback that opens the next + * segment for reading. + * + * Returns error information if the data could not be read or NULL if + * succeeded. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + */ +XLogReadError * +XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID *tli_p, WALOpenSegment *seg, WALSegmentOpen openSegment) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + int segbytes; + int readbytes; + + seg->off = XLogSegmentOffset(recptr, seg->size); + + if (seg->file < 0 || + !XLByteInSeg(recptr, seg->num, seg->size) || + (tli_p != NULL && *tli_p != seg->tli)) + { + XLogSegNo nextSegNo; + TimeLineID tli = InvalidTimeLineID; + int file; + + /* Switch to another logfile segment */ + if (seg->file >= 0) + close(seg->file); + + XLByteToSeg(recptr, nextSegNo, seg->size); + + /* If we have the TLI, let's pass it to the callback. */ + if (tli_p != NULL) + tli = *tli_p; + + /* Open the next segment in the caller's way. */ + openSegment(nextSegNo, &tli, &file, seg); + + /* + * If we passed InvalidTimeLineID, the callback should have + * determined the correct TLI and returned it. + */ + Assert(tli != InvalidTimeLineID); + + /* Update the open segment info. */ + seg->tli = tli; + seg->file = file; + + /* + * If the function is called by the XLOG reader, the reader will + * eventually set both "num" and "off". However we need to care + * about them too because the function can also be used directly, + * see walsender.c. + */ + seg->num = nextSegNo; + seg->off = 0; + } + + /* How many bytes are within this segment? */ + if (nbytes > (seg->size - seg->off)) + segbytes = seg->size - seg->off; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* + * Failure to read the data does not necessarily imply non-zero errno. + * Set it to zero so that caller can distinguish the failure that does + * not affect errno. + */ + errno = 0; + + readbytes = pg_pread(seg->file, p, segbytes, seg->off); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + XLogReadError *errinfo; + + errinfo = (XLogReadError *) palloc(sizeof(XLogReadError)); + errinfo->read_errno = errno; + errinfo->readbytes = readbytes; + errinfo->reqbytes = segbytes; + errinfo->seg = seg; + + return errinfo; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + + /* + * If the function is called by the XLOG reader, the reader will + * eventually set this field. However we need to care about it too + * because the function can also be used directly (see walsender.c). + */ + seg->off += readbytes; + } + + return NULL; +} + +#ifndef FRONTEND +/* + * Backend-specific convenience code to handle read errors encountered by + * XLogRead(). + */ +void +XLogReadProcessError(XLogReadError *errinfo) +{ + WALOpenSegment *seg = errinfo->seg; + + if (errinfo->readbytes < 0) + { + errno = errinfo->read_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u, length %zu: %m", + XLogFileNameP(seg->tli, seg->num), seg->off, + (Size) errinfo->reqbytes))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: length %zu", + XLogFileNameP(seg->tli, seg->num), seg->off, + (Size) errinfo->reqbytes))); + } +} +#endif + /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 424bb06919..38c3196168 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * frontend). Probably these should be merged at some point. */ static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) +XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, + Size count) { char *p; XLogRecPtr recptr; @@ -896,6 +896,39 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* + * Callback for XLogRead() to open the next segment. + */ +static void +read_local_xlog_page_segment_open(XLogSegNo nextSegNo, TimeLineID *tli_p, + int *file_p, WALOpenSegment *seg) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int file; + + Assert(tli != InvalidTimeLineID); + + XLogFilePath(path, tli, nextSegNo, seg->size); + file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (file < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + *file_p = file; +} + /* * read_page callback for reading local xlog files * @@ -915,6 +948,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, loc; int count; TimeLineID pageTLI; + XLogReadError *errinfo; loc = targetPagePtr + reqLen; @@ -1022,10 +1056,10 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->seg.size, state->seg.tli, targetPagePtr, - XLOG_BLCKSZ); - state->seg.tli = pageTLI; - + if ((errinfo = XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, &pageTLI, + &state->seg, + read_local_xlog_page_segment_open)) != NULL) + XLogReadProcessError(errinfo); /* number of valid bytes in the buffer */ return count; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a617e20ab6..a7b3e0ecbe 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -247,9 +247,12 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static void WalSndSegmentOpen(XLogSegNo nextSegNo, TimeLineID *tli_p, + int *file_p, WALOpenSegment *seg); +static void XLogReadOld(char *buf, XLogRecPtr startptr, Size count); + /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -763,6 +766,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req { XLogRecPtr flushptr; int count; + XLogReadError *errinfo; XLogReadDetermineTimeline(state, targetPagePtr, reqLen); sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); @@ -783,7 +787,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + if ((errinfo = XLogRead(cur_page, + targetPagePtr, + XLOG_BLCKSZ, + NULL, /* WalSndSegmentOpen will determine TLI */ + sendSeg, + WalSndSegmentOpen)) != NULL) + XLogReadProcessError(errinfo); return count; } @@ -2360,7 +2370,7 @@ WalSndKill(int code, Datum arg) * more than one. */ static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogReadOld(char *buf, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -2533,6 +2543,81 @@ retry: } } +/* + * Callback for XLogRead() to open the next segment. + */ +void +WalSndSegmentOpen(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p, + WALOpenSegment *seg) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int file; + + /* + * The timeline is determined below, caller should not pass it. + */ + Assert(tli == InvalidTimeLineID); + + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + tli = sendTimeLine; + if (sendTimeLineIsHistoric) + { + XLogSegNo endSegNo; + + XLByteToSeg(sendTimeLineValidUpto, endSegNo, seg->size); + if (seg->num == endSegNo) + tli = sendTimeLineNextTLI; + } + + XLogFilePath(path, tli, nextSegNo, seg->size); + file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (file < 0) + { + /* + * If the file is not found, assume it's because the standby asked for + * a too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(tli, nextSegNo)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + *file_p = file; + *tli_p = tli; +} + /* * Send out the WAL in its normal physical/stored form. * @@ -2550,6 +2635,8 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; + XLogReadError *errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2765,7 +2852,51 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + +retry: + if ((errinfo = XLogRead(&output_message.data[output_message.len], + startptr, + nbytes, + NULL, /* WalSndSegmentOpen will determine TLI */ + sendSeg, + WalSndSegmentOpen)) != NULL) + XLogReadProcessError(errinfo); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(startptr, segno, wal_segment_size); + CheckXLogRemoved(segno, ThisTimeLineID); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendSeg->file >= 0) + { + close(sendSeg->file); + sendSeg->file = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index a16793bb8b..cd5f589f03 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -296,6 +296,51 @@ identify_target_directory(XLogDumpPrivate *private, char *directory, fatal_error("could not find any WAL file"); } +static void +WALDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p, + WALOpenSegment *seg) +{ + TimeLineID tli = *tli_p; + char fname[MAXPGPATH]; + int file; + int tries; + + Assert(tli != InvalidTimeLineID); + + XLogFileName(fname, tli, nextSegNo, seg->size); + + /* + * In follow mode there is a short period of time after the server has + * written the end of the previous file before the new file is available. + * So we loop for 5 seconds looking for the file to appear before giving + * up. + */ + for (tries = 0; tries < 10; tries++) + { + file = open_file_in_directory(seg->dir, fname); + if (file >= 0) + break; + if (errno == ENOENT) + { + int save_errno = errno; + + /* File not there yet, try again */ + pg_usleep(500 * 1000); + + errno = save_errno; + continue; + } + /* Any other error, fall through and fail */ + break; + } + + if (file < 0) + fatal_error("could not find file \"%s\": %s", + fname, strerror(errno)); + + *file_p = file; +} + /* * Read count bytes from a segment file in the specified directory, for the * given timeline, containing the specified record pointer; store the data in @@ -427,6 +472,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, { XLogDumpPrivate *private = state->private_data; int count = XLOG_BLCKSZ; + XLogReadError *errinfo; if (private->endptr != InvalidXLogRecPtr) { @@ -441,8 +487,22 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr, - readBuff, count); + if ((errinfo = XLogRead(readBuff, targetPagePtr, count, &private->timeline, + &state->seg, WALDumpOpenSegment)) != NULL) + { + WALOpenSegment *seg = errinfo->seg; + char fname[MAXPGPATH]; + + XLogFileName(fname, seg->tli, seg->num, seg->size); + + if (errno != 0) + fatal_error("could not read from log file %s, offset %u, length %zu: %s", + fname, seg->off, (Size) errinfo->reqbytes, + strerror(errinfo->read_errno)); + else + fatal_error("could not read from log file %s, offset %u: length: %zu", + fname, seg->off, (Size) errinfo->reqbytes); + } return count; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index b9d99d524e..3d9742b81b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -227,8 +227,50 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ +/* + * Callback to open the specified WAL segment for reading. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "tli_p" is an input/output argument. If *tli_p is valid, it's the timeline + * the new segment should be in. If *tli_p==InvalidTimeLineID, the callback + * needs to determine the timeline itself and put the result into *tli_p. + * + * "file_p" points to an address the segment file descriptor should be stored + * at. + * + * "seg" provides information on the currently open segment. The callback is + * not supposed to change this info. + * + * BasicOpenFile() is the preferred way to open the segment file in backend + * code, whereas open(2) should be used in frontend. + */ +typedef void (*WALSegmentOpen) (XLogSegNo nextSegNo, TimeLineID *tli_p, + int *file_p, WALOpenSegment *seg); + extern void WALOpenSegmentInit(WALOpenSegment *seg, int size); +/* + * Error information that both backend and frontend caller can process. + * + * XXX Should the name be WALReadError? If so, we probably need to rename + * XLogRead() and XLogReadProcessError() too. + */ +typedef struct XLogReadError +{ + int read_errno; /* errno set by the last read(). */ + int readbytes; /* Bytes read by the last read(). */ + int reqbytes; /* Bytes requested to be read. */ + WALOpenSegment *seg; /* Segment we tried to read from. */ +} XLogReadError; + +extern XLogReadError *XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID *tli_p, WALOpenSegment *seg, + WALSegmentOpen openSegment); +#ifndef FRONTEND +void XLogReadProcessError(XLogReadError *errinfo); +#endif + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, -- 2.20.1
>From 0a0dc5fcf6fb570eb6ec4d145fe931beb5d21d93 Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Mon, 23 Sep 2019 07:40:49 +0200 Subject: [PATCH 6/6] Remove the old implemenations of XLogRead(). Done in a separate patch because the diff looks harder to read if one function (XLogRead) is removed and another one (the WALSegmentOpen callback) is added nearby at the same time (the addition and removal of code can get mixed in the diff). --- src/backend/access/transam/xlogutils.c | 125 ----------------- src/backend/replication/walsender.c | 187 ------------------------- src/bin/pg_waldump/pg_waldump.c | 123 ---------------- 3 files changed, 435 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 38c3196168..078d58e34c 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -17,14 +17,11 @@ */ #include "postgres.h" -#include <unistd.h> - #include "access/timeline.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" #include "miscadmin.h" -#include "pgstat.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -639,128 +636,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, forget_invalid_pages(rnode, forkNum, nblocks); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * in timeline 'tli'. - * - * Will open, and keep open, one WAL segment stored in the static file - * descriptor 'sendFile'. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - * - * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead - * in walsender.c but for small differences (such as lack of elog() in - * frontend). Probably these should be merged at some point. - */ -static void -XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - /* state maintained across calls */ - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; - static uint32 sendOff = 0; - - Assert(segsize == wal_segment_size); - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segsize); - - /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) || - sendTLI != tli) - { - char path[MAXPGPATH]; - - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, segsize); - - XLogFilePath(path, tli, sendSegNo, segsize); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - sendTLI = tli; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segsize - startoff)) - segbytes = segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * Determine which timeline to read an xlog page from and set the * XLogReaderState's currTLI to that timeline ID. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a7b3e0ecbe..3ea05add53 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -251,8 +251,6 @@ static void WalSndSegmentOpen(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p, WALOpenSegment *seg); -static void XLogReadOld(char *buf, XLogRecPtr startptr, Size count); - /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -2358,191 +2356,6 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. - * - * Will open, and keep open, one WAL segment stored in the global file - * descriptor sendFile. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - */ -static void -XLogReadOld(char *buf, XLogRecPtr startptr, Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - XLogSegNo segno; - -retry: - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, wal_segment_size); - - if (sendSeg->file < 0 || - !XLByteInSeg(recptr, sendSeg->num, sendSeg->size)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendSeg->file >= 0) - close(sendSeg->file); - - XLByteToSeg(recptr, sendSeg->num, sendSeg->size); - - /*------- - * When reading from a historic timeline, and there is a timeline - * switch within this segment, read from the WAL segment belonging - * to the new timeline. - * - * For example, imagine that this server is currently on timeline - * 5, and we're streaming timeline 4. The switch from timeline 4 - * to 5 happened at 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from - * segment 0x13, on timeline 4, we read the WAL from file - * 000000050000000000000013. Archive recovery prefers files from - * newer timelines, so if the segment was restored from the - * archive on this server, the file belonging to the old timeline, - * 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the - * used portion of the old segment is copied to the new file. - *------- - */ - sendSeg->tli = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); - if (sendSeg->num == endSegNo) - sendSeg->tli = sendTimeLineNextTLI; - } - - XLogFilePath(path, sendSeg->tli, sendSeg->num, wal_segment_size); - - sendSeg->file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendSeg->file < 0) - { - /* - * If the file is not found, assume it's because the standby - * asked for a too old WAL segment that has already been - * removed or recycled. - */ - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(sendSeg->tli, sendSeg->num)))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendSeg->off = 0; - } - - /* Need to seek in the file? */ - if (sendSeg->off != startoff) - { - if (lseek(sendSeg->file, (off_t) startoff, SEEK_SET) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(sendSeg->tli, sendSeg->num), - startoff))); - sendSeg->off = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (wal_segment_size - startoff)) - segbytes = wal_segment_size - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendSeg->file, p, segbytes); - pgstat_report_wait_end(); - if (readbytes < 0) - { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(sendSeg->tli, sendSeg->num), - sendSeg->off, (Size) segbytes))); - } - else if (readbytes == 0) - { - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(sendSeg->tli, sendSeg->num), - sendSeg->off, readbytes, (Size) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendSeg->off += readbytes; - nbytes -= readbytes; - p += readbytes; - } - - /* - * After reading into the buffer, check that what we read was valid. We do - * this after reading, because even though the segment was present when we - * opened it, it might get recycled or removed while we read it. The - * read() succeeds in that case, but the data we tried to read might - * already have been overwritten with new WAL records. - */ - XLByteToSeg(startptr, segno, wal_segment_size); - CheckXLogRemoved(segno, ThisTimeLineID); - - /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. - */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; - - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); - - if (reload && sendSeg->file >= 0) - { - close(sendSeg->file); - sendSeg->file = -1; - - goto retry; - } - } -} - /* * Callback for XLogRead() to open the next segment. */ diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index cd5f589f03..1de8eb350e 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -14,7 +14,6 @@ #include <dirent.h> #include <sys/stat.h> -#include <unistd.h> #include "access/xlogreader.h" #include "access/xlogrecord.h" @@ -341,128 +340,6 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, TimeLineID *tli_p, int *file_p, *file_p = file; } -/* - * Read count bytes from a segment file in the specified directory, for the - * given timeline, containing the specified record pointer; store the data in - * the passed buffer. - */ -static void -XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, - XLogRecPtr startptr, char *buf, Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static uint32 sendOff = 0; - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, WalSegSz); - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz)) - { - char fname[MAXFNAMELEN]; - int tries; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, WalSegSz); - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - /* - * In follow mode there is a short period of time after the server - * has written the end of the previous file before the new file is - * available. So we loop for 5 seconds looking for the file to - * appear before giving up. - */ - for (tries = 0; tries < 10; tries++) - { - sendFile = open_file_in_directory(directory, fname); - if (sendFile >= 0) - break; - if (errno == ENOENT) - { - int save_errno = errno; - - /* File not there yet, try again */ - pg_usleep(500 * 1000); - - errno = save_errno; - continue; - } - /* Any other error, fall through and fail */ - break; - } - - if (sendFile < 0) - fatal_error("could not find file \"%s\": %s", - fname, strerror(errno)); - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - int err = errno; - char fname[MAXPGPATH]; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - fatal_error("could not seek in log file %s to offset %u: %s", - fname, startoff, strerror(err)); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (WalSegSz - startoff)) - segbytes = WalSegSz - startoff; - else - segbytes = nbytes; - - readbytes = read(sendFile, p, segbytes); - if (readbytes <= 0) - { - int err = errno; - char fname[MAXPGPATH]; - int save_errno = errno; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - errno = save_errno; - - if (readbytes < 0) - fatal_error("could not read from log file %s, offset %u, length %d: %s", - fname, sendOff, segbytes, strerror(err)); - else if (readbytes == 0) - fatal_error("could not read from log file %s, offset %u: read %d of %zu", - fname, sendOff, readbytes, (Size) segbytes); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * XLogReader read_page callback */ -- 2.20.1