From: Andres Freund <and...@anarazel.de>

In order to have restartable replication with minimal additional writes its
very useful to know up to which point we have replayed/received changes from a
foreign node.

One representation of that is the lsn of changes at the originating cluster.

We need to keep track of the point up to which we received data and up to where
we applied data.

For that we added a field 'origin_lsn' to commit records. This allows to keep
track of the apply position with crash recovery with minimal additional io. We
only added the field to non-compact commit records to reduce the overhead in
case logical replication is not used.

Checkpoints need to keep track of the apply/receive positions as well because
otherwise it would be hard to determine the lsn from where to restart
receive/apply after a shutdown/crash if no changes happened since the last
shutdown/crash.

While running the startup process, the walreceiver and a (future) apply process
will need a coherent picture those two states so add shared memory state to
keep track of it. Currently this is represented in the walreceivers shared
memory segment. This will likely need to change.

During crash recovery/physical replication the origin_lsn field of commit
records is used to update the shared memory, and thus the next checkpoint's,
notion of the apply state.

Missing:

- For correct crash recovery we need more state than the 'apply lsn' because
  transactions on the originating side can overlap. At the lsn we just applied
  many other transaction can be in-progres. To correctly handle that we need to
  keep track of oldest start lsn of any transaction currently being reassembled
  (c.f. ApplyCache). Then we can start to reassemble the ApplyCache up from
  that point and throw away any transaction which comitted before the
  recorded/recovered apply lsn.
  It should be sufficient to store that knowledge in shared memory and
  checkpoint records.
---
 src/backend/access/transam/xact.c          |   22 ++++++++-
 src/backend/access/transam/xlog.c          |   73 ++++++++++++++++++++++++++++
 src/backend/replication/walreceiverfuncs.c |    8 +++
 src/include/access/xact.h                  |    1 +
 src/include/catalog/pg_control.h           |   13 ++++-
 src/include/replication/walreceiver.h      |   13 +++++
 6 files changed, 128 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index dc30a17..40ac965 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -39,11 +39,13 @@
 #include "replication/logical.h"
 #include "replication/syncrep.h"
 #include "replication/walsender.h"
+#include "replication/walreceiver.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
+#include "storage/spin.h"
 #include "utils/combocid.h"
 #include "utils/guc.h"
 #include "utils/inval.h"
@@ -1015,7 +1017,8 @@ RecordTransactionCommit(void)
                /*
                 * Do we need the long commit record? If not, use the compact 
format.
                 */
-               if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || 
forceSyncCommit)
+               if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || 
forceSyncCommit ||
+                   (wal_level == WAL_LEVEL_LOGICAL && 
current_replication_origin_id != guc_replication_origin_id))
                {
                        XLogRecData rdata[4];
                        int                     lastrdata = 0;
@@ -1037,6 +1040,8 @@ RecordTransactionCommit(void)
                        xlrec.nrels = nrels;
                        xlrec.nsubxacts = nchildren;
                        xlrec.nmsgs = nmsgs;
+                       xlrec.origin_lsn = current_replication_origin_lsn;
+
                        rdata[0].data = (char *) (&xlrec);
                        rdata[0].len = MinSizeOfXactCommit;
                        rdata[0].buffer = InvalidBuffer;
@@ -4575,6 +4580,21 @@ xact_redo_commit_internal(TransactionId xid, RepNodeId 
originating_node,
                LWLockRelease(XidGenLock);
        }
 
+       /*
+        * record where were at wrt to recovery. We need that to know from 
where on
+        * to restart applying logical change records
+        */
+       if(LogicalWalReceiverActive() && !XLByteEQ(origin_lsn, zeroRecPtr))
+       {
+               /*
+                * probably we don't need the locking because no lcr receiver 
can run
+                * yet.
+                */
+               SpinLockAcquire(&WalRcv->mutex);
+               WalRcv->mm_applyState[originating_node] = origin_lsn;
+               SpinLockRelease(&WalRcv->mutex);
+       }
+
        if (standbyState == STANDBY_DISABLED)
        {
                /*
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 0622726..20a4611 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5183,6 +5183,7 @@ BootStrapXLOG(void)
        uint64          sysidentifier;
        struct timeval tv;
        pg_crc32        crc;
+       int i;
 
        /*
         * Select a hopefully-unique system identifier code for this 
installation.
@@ -5229,6 +5230,13 @@ BootStrapXLOG(void)
        checkPoint.time = (pg_time_t) time(NULL);
        checkPoint.oldestActiveXid = InvalidTransactionId;
 
+       for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId;
+           i++){
+               checkPoint.logicalReceiveState[i] = zeroRecPtr;
+               checkPoint.logicalApplyState[i] = zeroRecPtr;
+       }
+
+
        ShmemVariableCache->nextXid = checkPoint.nextXid;
        ShmemVariableCache->nextOid = checkPoint.nextOid;
        ShmemVariableCache->oidCount = 0;
@@ -6314,6 +6322,53 @@ StartupXLOG(void)
                InRecovery = true;
        }
 
+       /*
+        * setup shared memory state for logical wal receiver
+        *
+        * Do this unconditionally so enabling/disabling/enabling logical replay
+        * doesn't loose information due to rewriting pg_control
+        */
+       {
+               int i;
+
+               Assert(WalRcv);
+               /* locking is not really required here afaics, but ... */
+               SpinLockAcquire(&WalRcv->mutex);
+
+               for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId 
- 1;
+                   i++)
+               {
+                       XLogRecPtr* receiveState = 
&ControlFile->checkPointCopy.logicalReceiveState[i];
+                       XLogRecPtr* applyState = 
&ControlFile->checkPointCopy.logicalApplyState[i];
+                       if(i == guc_replication_origin_id && (
+                                  !XLByteEQ(*receiveState, zeroRecPtr) ||
+                                  !XLByteEQ(*applyState, zeroRecPtr))
+                               )
+                       {
+                               elog(WARNING, "logical recovery state for own 
db. apply: %X/%X, receive %X/%X, origin %d",
+                                    applyState->xlogid, applyState->xrecoff,
+                                    receiveState->xlogid, 
receiveState->xrecoff,
+                                    guc_replication_origin_id);
+                               WalRcv->mm_receiveState[i] = zeroRecPtr;
+                               WalRcv->mm_applyState[i] = zeroRecPtr;
+                       }
+                       else{
+                               WalRcv->mm_receiveState[i] = *receiveState;
+                               WalRcv->mm_applyState[i] = *applyState;
+                       }
+               }
+               SpinLockRelease(&WalRcv->mutex);
+
+               /* FIXME: remove at some point */
+               for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId 
- 1;
+                   i++){
+                       elog(LOG, "restored apply state for node %d to %X/%X, 
receive %X/%X",
+                            i,
+                            WalRcv->mm_applyState[i].xlogid, 
WalRcv->mm_applyState[i].xrecoff,
+                            WalRcv->mm_receiveState[i].xlogid, 
WalRcv->mm_receiveState[i].xrecoff);
+               }
+       }
+
        /* REDO */
        if (InRecovery)
        {
@@ -7906,6 +7961,24 @@ CreateCheckPoint(int flags)
                                                         
&checkPoint.nextMultiOffset);
 
        /*
+        * fill out where are at wrt logical replay. Do this unconditionally so 
we
+        * don't loose information due to rewriting pg_control when toggling
+        * logical replay
+        */
+       {
+               int i;
+               SpinLockAcquire(&WalRcv->mutex);
+
+               for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId 
- 1;
+                   i++){
+                       checkPoint.logicalApplyState[i] = 
WalRcv->mm_applyState[i];
+                       checkPoint.logicalReceiveState[i] = 
WalRcv->mm_receiveState[i];
+               }
+               SpinLockRelease(&WalRcv->mutex);
+               elog(LOG, "updated logical checkpoint data");
+       }
+
+       /*
         * Having constructed the checkpoint record, ensure all shmem disk 
buffers
         * and commit-log buffers are flushed to disk.
         *
diff --git a/src/backend/replication/walreceiverfuncs.c 
b/src/backend/replication/walreceiverfuncs.c
index 876196f..cb49282 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,6 +64,14 @@ WalRcvShmemInit(void)
                MemSet(WalRcv, 0, WalRcvShmemSize());
                WalRcv->walRcvState = WALRCV_STOPPED;
                SpinLockInit(&WalRcv->mutex);
+
+               memset(&WalRcv->mm_receiveState,
+                      0, sizeof(WalRcv->mm_receiveState));
+               memset(&WalRcv->mm_applyState,
+                      0, sizeof(WalRcv->mm_applyState));
+
+               memset(&WalRcv->mm_receiveLatch,
+                      0, sizeof(WalRcv->mm_receiveLatch));
        }
 }
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b12d2a0..2757782 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -137,6 +137,7 @@ typedef struct xl_xact_commit
        int                     nmsgs;                  /* number of shared 
inval msgs */
        Oid                     dbId;                   /* MyDatabaseId */
        Oid                     tsId;                   /* MyDatabaseTableSpace 
*/
+       XLogRecPtr      origin_lsn;     /* location of originating commit */
        /* Array of RelFileNode(s) to drop at commit */
        RelFileNode xnodes[1];          /* VARIABLE LENGTH ARRAY */
        /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 5cff396..bc6316e 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -16,12 +16,13 @@
 #define PG_CONTROL_H
 
 #include "access/xlogdefs.h"
+#include "replication/logical.h"
 #include "pgtime.h"                            /* for pg_time_t */
 #include "utils/pg_crc.h"
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION     922
+#define PG_CONTROL_VERSION     923
 
 /*
  * Body of CheckPoint XLOG records.  This is declared here because we keep
@@ -50,6 +51,13 @@ typedef struct CheckPoint
         * it's set to InvalidTransactionId.
         */
        TransactionId oldestActiveXid;
+
+       /*
+        * The replay state from every other node. This is only needed if 
wal_level
+        * >= logical and thus is only filled then.
+        */
+       XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1];
+       XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];
 } CheckPoint;
 
 /* XLOG info values for XLOG rmgr */
@@ -85,6 +93,9 @@ typedef enum DBState
  * NOTE: try to keep this under 512 bytes so that it will fit on one physical
  * sector of typical disk drives.  This reduces the odds of corruption due to
  * power failure midway through a write.
+ *
+ * FIXME: in order to allow many nodes in mm (which increases checkpoint size)
+ * we should change the writing of this to 
write(temp_file);fsync();rename();fsync();
  */
 
 typedef struct ControlFileData
diff --git a/src/include/replication/walreceiver.h 
b/src/include/replication/walreceiver.h
index d21ec94..c9ab1be 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -14,6 +14,8 @@
 
 #include "access/xlog.h"
 #include "access/xlogdefs.h"
+#include "replication/logical.h"
+#include "storage/latch.h"
 #include "storage/spin.h"
 #include "pgtime.h"
 
@@ -90,6 +92,17 @@ typedef struct
        char            conninfo[MAXCONNINFO];
 
        slock_t         mutex;                  /* locks shared variables shown 
above */
+
+       /*
+        * replay point up to which we replayed for every node
+        * XXX: should possibly be dynamically sized?
+        * FIXME: should go to its own shm segment?
+        */
+       XLogRecPtr  mm_receiveState[MaxMultimasterNodeId - 1];
+       XLogRecPtr  mm_applyState[MaxMultimasterNodeId - 1];
+
+       Latch*       mm_receiveLatch[MaxMultimasterNodeId - 1];
+
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to