From: Andres Freund <and...@anarazel.de> For that add a 'node_id' parameter to most commands dealing with wal segments. A node_id thats 'InvalidMultimasterNodeId' references local wal, every other node_id referes to wal in a new pg_lcr directory.
Using duplicated code would reduce the impact of that change but the long-term code-maintenance burden outweighs that by a far bit. Besides the decision to add a 'node_id' parameter to several functions the changes in this patch are fairly mechanical. --- src/backend/access/transam/xlog.c | 54 ++++++++++++++++----------- src/backend/replication/basebackup.c | 4 +- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 9 +++-- src/bin/initdb/initdb.c | 1 + src/bin/pg_resetxlog/pg_resetxlog.c | 2 +- src/include/access/xlog.h | 2 +- src/include/access/xlog_internal.h | 13 +++++-- src/include/replication/logical.h | 2 + src/include/replication/walsender_private.h | 2 +- 10 files changed, 56 insertions(+), 35 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 504b4d0..0622726 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -635,8 +635,8 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, static bool AdvanceXLInsertBuffer(bool new_segment); static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); -static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, - bool find_free, int *max_advance, +static bool InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg, + char *tmppath, bool find_free, int *max_advance, bool use_lock); static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, int source, bool notexistOk); @@ -1736,8 +1736,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) /* create/use new log file */ use_existent = true; - openLogFile = XLogFileInit(openLogId, openLogSeg, - &use_existent, true); + openLogFile = XLogFileInit(InvalidMultimasterNodeId, openLogId, + openLogSeg, &use_existent, true); openLogOff = 0; } @@ -2376,6 +2376,9 @@ XLogNeedsFlush(XLogRecPtr record) * place. This should be TRUE except during bootstrap log creation. The * caller must *not* hold the lock at call. * + * node_id: if != InvalidMultimasterNodeId this xlog file is actually a LCR + * file + * * Returns FD of opened file. * * Note: errors here are ERROR not PANIC because we might or might not be @@ -2384,8 +2387,8 @@ XLogNeedsFlush(XLogRecPtr record) * in a critical section. */ int -XLogFileInit(uint32 log, uint32 seg, - bool *use_existent, bool use_lock) +XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg, + bool *use_existent, bool use_lock) { char path[MAXPGPATH]; char tmppath[MAXPGPATH]; @@ -2396,7 +2399,7 @@ XLogFileInit(uint32 log, uint32 seg, int fd; int nbytes; - XLogFilePath(path, ThisTimeLineID, log, seg); + XLogFilePath(path, ThisTimeLineID, node_id, log, seg); /* * Try to use existent file (checkpoint maker may have created it already) @@ -2425,6 +2428,11 @@ XLogFileInit(uint32 log, uint32 seg, */ elog(DEBUG2, "creating and filling new WAL file"); + /* + * FIXME: to be safe we need to create tempfile in the pg_lcr directory if + * its actually an lcr file because pg_lcr might be in a different + * partition. + */ snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid()); unlink(tmppath); @@ -2493,7 +2501,7 @@ XLogFileInit(uint32 log, uint32 seg, installed_log = log; installed_seg = seg; max_advance = XLOGfileslop; - if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath, + if (!InstallXLogFileSegment(node_id, &installed_log, &installed_seg, tmppath, *use_existent, &max_advance, use_lock)) { @@ -2548,7 +2556,7 @@ XLogFileCopy(uint32 log, uint32 seg, /* * Open the source file */ - XLogFilePath(path, srcTLI, srclog, srcseg); + XLogFilePath(path, srcTLI, InvalidMultimasterNodeId, srclog, srcseg); srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (srcfd < 0) ereport(ERROR, @@ -2619,7 +2627,8 @@ XLogFileCopy(uint32 log, uint32 seg, /* * Now move the segment into place with its final name. */ - if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false)) + if (!InstallXLogFileSegment(InvalidMultimasterNodeId, &log, &seg, tmppath, + false, NULL, false)) elog(ERROR, "InstallXLogFileSegment should not have failed"); } @@ -2653,14 +2662,14 @@ XLogFileCopy(uint32 log, uint32 seg, * file into place. */ static bool -InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, +InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock) { char path[MAXPGPATH]; struct stat stat_buf; - XLogFilePath(path, ThisTimeLineID, *log, *seg); + XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg); /* * We want to be sure that only one process does this at a time. @@ -2687,7 +2696,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, } NextLogSeg(*log, *seg); (*max_advance)--; - XLogFilePath(path, ThisTimeLineID, *log, *seg); + XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg); } } @@ -2736,7 +2745,7 @@ XLogFileOpen(uint32 log, uint32 seg) char path[MAXPGPATH]; int fd; - XLogFilePath(path, ThisTimeLineID, log, seg); + XLogFilePath(path, ThisTimeLineID, InvalidMultimasterNodeId, log, seg); fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method), S_IRUSR | S_IWUSR); @@ -2783,7 +2792,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, case XLOG_FROM_PG_XLOG: case XLOG_FROM_STREAM: - XLogFilePath(path, tli, log, seg); + XLogFilePath(path, tli, InvalidMultimasterNodeId, log, seg); restoredFromArchive = false; break; @@ -2804,7 +2813,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, bool reload = false; struct stat statbuf; - XLogFilePath(xlogfpath, tli, log, seg); + XLogFilePath(xlogfpath, tli, InvalidMultimasterNodeId, log, seg); if (stat(xlogfpath, &statbuf) == 0) { if (unlink(xlogfpath) != 0) @@ -2922,7 +2931,7 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources) } /* Couldn't find it. For simplicity, complain about front timeline */ - XLogFilePath(path, recoveryTargetTLI, log, seg); + XLogFilePath(path, recoveryTargetTLI, InvalidMultimasterNodeId, log, seg); errno = ENOENT; ereport(emode, (errcode_for_file_access(), @@ -3366,7 +3375,8 @@ PreallocXlogFiles(XLogRecPtr endptr) { NextLogSeg(_logId, _logSeg); use_existent = true; - lf = XLogFileInit(_logId, _logSeg, &use_existent, true); + lf = XLogFileInit(InvalidMultimasterNodeId, _logId, _logSeg, + &use_existent, true); close(lf); if (!use_existent) CheckpointStats.ckpt_segs_added++; @@ -3486,8 +3496,9 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) * separate archive directory. */ if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && - InstallXLogFileSegment(&endlogId, &endlogSeg, path, - true, &max_advance, true)) + InstallXLogFileSegment(InvalidMultimasterNodeId, &endlogId, + &endlogSeg, path, true, + &max_advance, true)) { ereport(DEBUG2, (errmsg("recycled transaction log file \"%s\"", @@ -5255,7 +5266,8 @@ BootStrapXLOG(void) /* Create first XLOG segment file */ use_existent = false; - openLogFile = XLogFileInit(0, 1, &use_existent, false); + openLogFile = XLogFileInit(InvalidMultimasterNodeId, 0, 1, + &use_existent, false); /* Write the first page with the initial record */ errno = 0; diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 0bc88a4..47e4641 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -245,7 +245,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) char fn[MAXPGPATH]; int i; - XLogFilePath(fn, ThisTimeLineID, logid, logseg); + XLogFilePath(fn, ThisTimeLineID, InvalidMultimasterNodeId, logid, logseg); _tarWriteHeader(fn, NULL, &statbuf); /* Send the actual WAL file contents, block-by-block */ @@ -264,7 +264,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) * http://lists.apple.com/archives/xcode-users/2003/Dec//msg000 * 51.html */ - XLogRead(buf, ptr, TAR_SEND_SIZE); + XLogRead(buf, InvalidMultimasterNodeId, ptr, TAR_SEND_SIZE); if (pq_putmessage('d', buf, TAR_SEND_SIZE)) ereport(ERROR, (errmsg("base backup could not send data, aborting backup"))); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 650b74f..e97196b 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -509,7 +509,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* Create/use new log file */ XLByteToSeg(recptr, recvId, recvSeg); use_existent = true; - recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true); + recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true); recvOff = 0; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e44c734..8cd3a00 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -977,7 +977,7 @@ WalSndKill(int code, Datum arg) * more than one. */ void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -1009,8 +1009,8 @@ retry: close(sendFile); XLByteToSeg(recptr, sendId, sendSeg); - XLogFilePath(path, ThisTimeLineID, sendId, sendSeg); - + XLogFilePath(path, ThisTimeLineID, node_id, + sendId, sendSeg); sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (sendFile < 0) { @@ -1215,7 +1215,8 @@ XLogSend(char *msgbuf, bool *caughtup) * Read the log directly into the output buffer to avoid extra memcpy * calls. */ - XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); + XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId, + startptr, nbytes); /* * We fill the message header last so that the send timestamp is taken as diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 3789948..1f26382 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -2637,6 +2637,7 @@ main(int argc, char *argv[]) "global", "pg_xlog", "pg_xlog/archive_status", + "pg_lcr", "pg_clog", "pg_notify", "pg_serial", diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c index 65ba910..7ee3a3a 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.c +++ b/src/bin/pg_resetxlog/pg_resetxlog.c @@ -973,7 +973,7 @@ WriteEmptyXLOG(void) /* Write the first page */ XLogFilePath(path, ControlFile.checkPointCopy.ThisTimeLineID, - newXlogId, newXlogSeg); + InvalidMultimasterNodeId, newXlogId, newXlogSeg); unlink(path); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index dd89cff..3b02c0b 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -268,7 +268,7 @@ extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); -extern int XLogFileInit(uint32 log, uint32 seg, +extern int XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg, bool *use_existent, bool use_lock); extern int XLogFileOpen(uint32 log, uint32 seg); diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 3328a50..deadddf 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -19,6 +19,7 @@ #include "access/xlog.h" #include "fmgr.h" #include "pgtime.h" +#include "replication/logical.h" #include "storage/block.h" #include "storage/relfilenode.h" @@ -216,14 +217,11 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader; #define MAXFNAMELEN 64 #define XLogFileName(fname, tli, log, seg) \ - snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg) + snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg); #define XLogFromFileName(fname, tli, log, seg) \ sscanf(fname, "%08X%08X%08X", tli, log, seg) -#define XLogFilePath(path, tli, log, seg) \ - snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg) - #define TLHistoryFileName(fname, tli) \ snprintf(fname, MAXFNAMELEN, "%08X.history", tli) @@ -239,6 +237,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader; #define BackupHistoryFilePath(path, tli, log, seg, offset) \ snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X.%08X.backup", tli, log, seg, offset) +/* FIXME: move to xlogutils.c, needs to fix sharing with receivexlog.c first though */ +static inline int XLogFilePath(char* path, TimeLineID tli, RepNodeId node_id, uint32 log, uint32 seg){ + if(node_id == InvalidMultimasterNodeId) + return snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg); + else + return snprintf(path, MAXPGPATH, LCRDIR "/%d/%08X%08X%08X", node_id, tli, log, seg); +} /* * Method table for resource managers. diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 0698b61..8f44fad 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -19,4 +19,6 @@ extern XLogRecPtr current_replication_origin_lsn; #define InvalidMultimasterNodeId 0 #define MaxMultimasterNodeId (2<<3) + +#define LCRDIR "pg_lcr" #endif diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 66234cd..bc58ff4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl; extern void WalSndSetState(WalSndState state); -extern void XLogRead(char *buf, XLogRecPtr startptr, Size count); +extern void XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count); /* * Internal functions for parsing the replication grammar, in repl_gram.y and -- 1.7.10.rc3.3.g19a6c.dirty -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers