From: Andres Freund <and...@anarazel.de>

For that add a 'node_id' parameter to most commands dealing with wal
segments. A node_id thats 'InvalidMultimasterNodeId' references local wal,
every other node_id referes to wal in a new pg_lcr directory.

Using duplicated code would reduce the impact of that change but the long-term
code-maintenance burden outweighs that by a far bit.

Besides the decision to add a 'node_id' parameter to several functions the
changes in this patch are fairly mechanical.
---
 src/backend/access/transam/xlog.c           |   54 ++++++++++++++++-----------
 src/backend/replication/basebackup.c        |    4 +-
 src/backend/replication/walreceiver.c       |    2 +-
 src/backend/replication/walsender.c         |    9 +++--
 src/bin/initdb/initdb.c                     |    1 +
 src/bin/pg_resetxlog/pg_resetxlog.c         |    2 +-
 src/include/access/xlog.h                   |    2 +-
 src/include/access/xlog_internal.h          |   13 +++++--
 src/include/replication/logical.h           |    2 +
 src/include/replication/walsender_private.h |    2 +-
 10 files changed, 56 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 504b4d0..0622726 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -635,8 +635,8 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool 
doPageWrites,
 static bool AdvanceXLInsertBuffer(bool new_segment);
 static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
-static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
-                                          bool find_free, int *max_advance,
+static bool InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg,
+                                          char *tmppath, bool find_free, int 
*max_advance,
                                           bool use_lock);
 static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
                         int source, bool notexistOk);
@@ -1736,8 +1736,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool 
xlog_switch)
 
                        /* create/use new log file */
                        use_existent = true;
-                       openLogFile = XLogFileInit(openLogId, openLogSeg,
-                                                                          
&use_existent, true);
+                       openLogFile = XLogFileInit(InvalidMultimasterNodeId, 
openLogId,
+                                                  openLogSeg, &use_existent, 
true);
                        openLogOff = 0;
                }
 
@@ -2376,6 +2376,9 @@ XLogNeedsFlush(XLogRecPtr record)
  * place.  This should be TRUE except during bootstrap log creation.  The
  * caller must *not* hold the lock at call.
  *
+ * node_id: if != InvalidMultimasterNodeId this xlog file is actually a LCR
+ * file
+ *
  * Returns FD of opened file.
  *
  * Note: errors here are ERROR not PANIC because we might or might not be
@@ -2384,8 +2387,8 @@ XLogNeedsFlush(XLogRecPtr record)
  * in a critical section.
  */
 int
-XLogFileInit(uint32 log, uint32 seg,
-                        bool *use_existent, bool use_lock)
+XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
+             bool *use_existent, bool use_lock)
 {
        char            path[MAXPGPATH];
        char            tmppath[MAXPGPATH];
@@ -2396,7 +2399,7 @@ XLogFileInit(uint32 log, uint32 seg,
        int                     fd;
        int                     nbytes;
 
-       XLogFilePath(path, ThisTimeLineID, log, seg);
+       XLogFilePath(path, ThisTimeLineID, node_id, log, seg);
 
        /*
         * Try to use existent file (checkpoint maker may have created it 
already)
@@ -2425,6 +2428,11 @@ XLogFileInit(uint32 log, uint32 seg,
         */
        elog(DEBUG2, "creating and filling new WAL file");
 
+       /*
+        * FIXME: to be safe we need to create tempfile in the pg_lcr directory 
if
+        * its actually an lcr file because pg_lcr might be in a different
+        * partition.
+        */
        snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
 
        unlink(tmppath);
@@ -2493,7 +2501,7 @@ XLogFileInit(uint32 log, uint32 seg,
        installed_log = log;
        installed_seg = seg;
        max_advance = XLOGfileslop;
-       if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
+       if (!InstallXLogFileSegment(node_id, &installed_log, &installed_seg, 
tmppath,
                                                                *use_existent, 
&max_advance,
                                                                use_lock))
        {
@@ -2548,7 +2556,7 @@ XLogFileCopy(uint32 log, uint32 seg,
        /*
         * Open the source file
         */
-       XLogFilePath(path, srcTLI, srclog, srcseg);
+       XLogFilePath(path, srcTLI, InvalidMultimasterNodeId, srclog, srcseg);
        srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
        if (srcfd < 0)
                ereport(ERROR,
@@ -2619,7 +2627,8 @@ XLogFileCopy(uint32 log, uint32 seg,
        /*
         * Now move the segment into place with its final name.
         */
-       if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
+       if (!InstallXLogFileSegment(InvalidMultimasterNodeId, &log, &seg, 
tmppath,
+                                   false, NULL, false))
                elog(ERROR, "InstallXLogFileSegment should not have failed");
 }
 
@@ -2653,14 +2662,14 @@ XLogFileCopy(uint32 log, uint32 seg,
  * file into place.
  */
 static bool
-InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
+InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg, char 
*tmppath,
                                           bool find_free, int *max_advance,
                                           bool use_lock)
 {
        char            path[MAXPGPATH];
        struct stat stat_buf;
 
-       XLogFilePath(path, ThisTimeLineID, *log, *seg);
+       XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
 
        /*
         * We want to be sure that only one process does this at a time.
@@ -2687,7 +2696,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char 
*tmppath,
                        }
                        NextLogSeg(*log, *seg);
                        (*max_advance)--;
-                       XLogFilePath(path, ThisTimeLineID, *log, *seg);
+                       XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
                }
        }
 
@@ -2736,7 +2745,7 @@ XLogFileOpen(uint32 log, uint32 seg)
        char            path[MAXPGPATH];
        int                     fd;
 
-       XLogFilePath(path, ThisTimeLineID, log, seg);
+       XLogFilePath(path, ThisTimeLineID, InvalidMultimasterNodeId, log, seg);
 
        fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
                                           S_IRUSR | S_IWUSR);
@@ -2783,7 +2792,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, 
TimeLineID tli,
 
                case XLOG_FROM_PG_XLOG:
                case XLOG_FROM_STREAM:
-                       XLogFilePath(path, tli, log, seg);
+                       XLogFilePath(path, tli, InvalidMultimasterNodeId, log, 
seg);
                        restoredFromArchive = false;
                        break;
 
@@ -2804,7 +2813,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, 
TimeLineID tli,
                bool            reload = false;
                struct stat statbuf;
 
-               XLogFilePath(xlogfpath, tli, log, seg);
+               XLogFilePath(xlogfpath, tli, InvalidMultimasterNodeId, log, 
seg);
                if (stat(xlogfpath, &statbuf) == 0)
                {
                        if (unlink(xlogfpath) != 0)
@@ -2922,7 +2931,7 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int 
sources)
        }
 
        /* Couldn't find it.  For simplicity, complain about front timeline */
-       XLogFilePath(path, recoveryTargetTLI, log, seg);
+       XLogFilePath(path, recoveryTargetTLI, InvalidMultimasterNodeId, log, 
seg);
        errno = ENOENT;
        ereport(emode,
                        (errcode_for_file_access(),
@@ -3366,7 +3375,8 @@ PreallocXlogFiles(XLogRecPtr endptr)
        {
                NextLogSeg(_logId, _logSeg);
                use_existent = true;
-               lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
+               lf = XLogFileInit(InvalidMultimasterNodeId, _logId, _logSeg,
+                                 &use_existent, true);
                close(lf);
                if (!use_existent)
                        CheckpointStats.ckpt_segs_added++;
@@ -3486,8 +3496,9 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr 
endptr)
                                 * separate archive directory.
                                 */
                                if (lstat(path, &statbuf) == 0 && 
S_ISREG(statbuf.st_mode) &&
-                                       InstallXLogFileSegment(&endlogId, 
&endlogSeg, path,
-                                                                               
   true, &max_advance, true))
+                                   
InstallXLogFileSegment(InvalidMultimasterNodeId, &endlogId,
+                                                          &endlogSeg, path, 
true,
+                                                          &max_advance, true))
                                {
                                        ereport(DEBUG2,
                                                        (errmsg("recycled 
transaction log file \"%s\"",
@@ -5255,7 +5266,8 @@ BootStrapXLOG(void)
 
        /* Create first XLOG segment file */
        use_existent = false;
-       openLogFile = XLogFileInit(0, 1, &use_existent, false);
+       openLogFile = XLogFileInit(InvalidMultimasterNodeId, 0, 1,
+                                  &use_existent, false);
 
        /* Write the first page with the initial record */
        errno = 0;
diff --git a/src/backend/replication/basebackup.c 
b/src/backend/replication/basebackup.c
index 0bc88a4..47e4641 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -245,7 +245,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                        char            fn[MAXPGPATH];
                        int                     i;
 
-                       XLogFilePath(fn, ThisTimeLineID, logid, logseg);
+                       XLogFilePath(fn, ThisTimeLineID, 
InvalidMultimasterNodeId, logid, logseg);
                        _tarWriteHeader(fn, NULL, &statbuf);
 
                        /* Send the actual WAL file contents, block-by-block */
@@ -264,7 +264,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                                 * 
http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
                                 * 51.html
                                 */
-                               XLogRead(buf, ptr, TAR_SEND_SIZE);
+                               XLogRead(buf, InvalidMultimasterNodeId, ptr, 
TAR_SEND_SIZE);
                                if (pq_putmessage('d', buf, TAR_SEND_SIZE))
                                        ereport(ERROR,
                                                        (errmsg("base backup 
could not send data, aborting backup")));
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 650b74f..e97196b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -509,7 +509,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
                        /* Create/use new log file */
                        XLByteToSeg(recptr, recvId, recvSeg);
                        use_existent = true;
-                       recvFile = XLogFileInit(recvId, recvSeg, &use_existent, 
true);
+                       recvFile = XLogFileInit(InvalidMultimasterNodeId, 
recvId, recvSeg, &use_existent, true);
                        recvOff = 0;
                }
 
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index e44c734..8cd3a00 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -977,7 +977,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count)
 {
        char       *p;
        XLogRecPtr      recptr;
@@ -1009,8 +1009,8 @@ retry:
                                close(sendFile);
 
                        XLByteToSeg(recptr, sendId, sendSeg);
-                       XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
-
+                       XLogFilePath(path, ThisTimeLineID, node_id,
+                                    sendId, sendSeg);
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
                        {
@@ -1215,7 +1215,8 @@ XLogSend(char *msgbuf, bool *caughtup)
         * Read the log directly into the output buffer to avoid extra memcpy
         * calls.
         */
-       XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+       XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), 
InvalidMultimasterNodeId,
+                startptr, nbytes);
 
        /*
         * We fill the message header last so that the send timestamp is taken 
as
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 3789948..1f26382 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -2637,6 +2637,7 @@ main(int argc, char *argv[])
                "global",
                "pg_xlog",
                "pg_xlog/archive_status",
+               "pg_lcr",
                "pg_clog",
                "pg_notify",
                "pg_serial",
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c 
b/src/bin/pg_resetxlog/pg_resetxlog.c
index 65ba910..7ee3a3a 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -973,7 +973,7 @@ WriteEmptyXLOG(void)
 
        /* Write the first page */
        XLogFilePath(path, ControlFile.checkPointCopy.ThisTimeLineID,
-                                newXlogId, newXlogSeg);
+                    InvalidMultimasterNodeId, newXlogId, newXlogSeg);
 
        unlink(path);
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dd89cff..3b02c0b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -268,7 +268,7 @@ extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, 
XLogRecData *rdata);
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern bool XLogBackgroundFlush(void);
 extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int XLogFileInit(uint32 log, uint32 seg,
+extern int XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
                         bool *use_existent, bool use_lock);
 extern int     XLogFileOpen(uint32 log, uint32 seg);
 
diff --git a/src/include/access/xlog_internal.h 
b/src/include/access/xlog_internal.h
index 3328a50..deadddf 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -19,6 +19,7 @@
 #include "access/xlog.h"
 #include "fmgr.h"
 #include "pgtime.h"
+#include "replication/logical.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
@@ -216,14 +217,11 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define MAXFNAMELEN            64
 
 #define XLogFileName(fname, tli, log, seg)     \
-       snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+       snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg);
 
 #define XLogFromFileName(fname, tli, log, seg) \
        sscanf(fname, "%08X%08X%08X", tli, log, seg)
 
-#define XLogFilePath(path, tli, log, seg)      \
-       snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg)
-
 #define TLHistoryFileName(fname, tli)  \
        snprintf(fname, MAXFNAMELEN, "%08X.history", tli)
 
@@ -239,6 +237,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define BackupHistoryFilePath(path, tli, log, seg, offset)     \
        snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X.%08X.backup", tli, 
log, seg, offset)
 
+/* FIXME: move to xlogutils.c, needs to fix sharing with receivexlog.c first 
though */
+static inline int XLogFilePath(char* path, TimeLineID tli, RepNodeId node_id, 
uint32 log, uint32 seg){
+       if(node_id == InvalidMultimasterNodeId)
+               return snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, 
log, seg);
+       else
+               return snprintf(path, MAXPGPATH, LCRDIR "/%d/%08X%08X%08X", 
node_id, tli, log, seg);
+}
 
 /*
  * Method table for resource managers.
diff --git a/src/include/replication/logical.h 
b/src/include/replication/logical.h
index 0698b61..8f44fad 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -19,4 +19,6 @@ extern XLogRecPtr current_replication_origin_lsn;
 
 #define InvalidMultimasterNodeId 0
 #define MaxMultimasterNodeId (2<<3)
+
+#define LCRDIR                         "pg_lcr"
 #endif
diff --git a/src/include/replication/walsender_private.h 
b/src/include/replication/walsender_private.h
index 66234cd..bc58ff4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl;
 
 
 extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size 
count);
 
 /*
  * Internal functions for parsing the replication grammar, in repl_gram.y and
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to