On 29 March 2017 at 08:11, Craig Ringer <[email protected]> wrote:
> On 29 March 2017 at 08:01, Craig Ringer <[email protected]> wrote:
>
>> I just notice that I failed to remove the docs changes regarding
>> dropping slots becoming db-specific, so I'll post a follow-up for that
>> in a sec.
>
> Attached.
... and here's the next in the patch series. Both this and the
immediately prior minor patch fix-drop-slot-docs.patch are pending
now.
Notable changes in this patch since review:
* Split oldestCatalogXmin tracking into separate patch
* Critically, fix use of procArray->replication_slot_catalog_xmin in
GetSnapshotData's setting of RecentGlobalXmin and RecentGlobalDataXmin
so it instead uses ShmemVariableCache->oldestCatalogXmin . This
could've led to tuples newer than oldestCatalogXmin being removed.
* Memory barrier in UpdateOldestCatalogXmin and SetOldestCatalogXmin.
It still does a pre-check before deciding if it needs to take
ProcArrayLock, recheck, and advance, since we don't want to
unnecessarily contest ProcArrayLock.
* Remove unnecessary volatile usage (retained in
UpdateOldestCatalogXmin due to barrier)
* Remove unnecessary test for XLogInsertAllowed() in XactLogCatalogXminUpdate
* EnsureActiveLogicalSlotValid(void) - add (void)
* pgidented changes in this diff; have left unrelated changes alone
Re:
> what does
>
> + TransactionId oldestCatalogXmin; /* oldest xid where complete catalog
> state
> + *
> is guaranteed to still exist */
>
> mean? I complained about the overall justification in the commit
> already, but looking at this commit alone, the justification for this
> part of the change is quite hard to understand.
The patch now contains
TransactionId oldestCatalogXmin; /* oldest xid it is guaranteed to be safe
* to create a historic snapshot for; see
* also
* procArray->replication_slot_catalog_xmin
* */
which I think is an improvement.
I've also sought to explain the purpose of this change better with
/*
* If necessary, copy the current catalog_xmin needed by replication slots to
* the effective catalog_xmin used for dead tuple removal and write a WAL
* record recording the change.
*
* This allows standbys to know the oldest xid for which it is safe to create
* a historic snapshot for logical decoding. VACUUM or other cleanup may have
* removed catalog tuple versions needed to correctly decode transactions older
* than this threshold. Standbys can use this information to cancel conflicting
* decoding sessions and invalidate slots that need discarded information.
*
* (We can't use the transaction IDs in WAL records emitted by VACUUM etc for
* this, since they don't identify the relation as a catalog or not. Nor can a
* standby look up the relcache to get the Relation for the affected
* relfilenode to check if it is a catalog. The standby would also have no way
* to know the oldest safe position at startup if it wasn't in the control
* file.)
*/
void
UpdateOldestCatalogXmin(void)
{
...
Does that help?
(Sidenote for later: ResolveRecoveryConflictWithLogicalDecoding will
need a read barrier too, when the next patch adds it.)
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
From 4b8e3aaa52539ef8cf3c79d1ed0319cc44800a32 Mon Sep 17 00:00:00 2001
From: Craig Ringer <[email protected]>
Date: Wed, 22 Mar 2017 13:36:49 +0800
Subject: [PATCH] Log catalog_xmin advances before removing catalog tuples
Write a WAL record before advancing the oldest catalog_xmin preserved by
VACUUM and other tuple removal.
Previously GetOldestXmin would use procArray->replication_slot_catalog_xmin as
the xid limit for vacuuming catalog tuples, so it was not possible for standbys
to determine whether all catalog tuples needed for a catalog snapshot for a
given xid would still exist.
Logging catalog_xmin advances allows standbys to determine if a logical slot on
the standby has become unsafe to use. It can then refuse to start logical
decoding on that slot or, if decoding is in progress, raise a conflict with
recovery.
Note that we only emit new WAL records if catalog_xmin changes, which happens
due to changes in slot state. So this won't generate WAL whenever oldestXmin
advances.
---
src/backend/access/heap/rewriteheap.c | 3 +-
src/backend/access/rmgrdesc/xactdesc.c | 9 +++
src/backend/access/transam/varsup.c | 14 ++++
src/backend/access/transam/xact.c | 35 ++++++++
src/backend/access/transam/xlog.c | 12 ++-
src/backend/commands/vacuum.c | 9 +++
src/backend/postmaster/bgwriter.c | 10 +++
src/backend/replication/logical/decode.c | 11 +++
src/backend/replication/logical/logical.c | 38 +++++++++
src/backend/replication/walreceiver.c | 2 +-
src/backend/replication/walsender.c | 13 +++
src/backend/storage/ipc/procarray.c | 119 +++++++++++++++++++++++++---
src/bin/pg_controldata/pg_controldata.c | 2 +
src/include/access/transam.h | 6 ++
src/include/access/xact.h | 12 ++-
src/include/catalog/pg_control.h | 1 +
src/include/storage/procarray.h | 5 +-
src/test/recovery/t/006_logical_decoding.pl | 12 ++-
18 files changed, 294 insertions(+), 19 deletions(-)
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..36bbb98 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
if (!state->rs_logical_rewrite)
return;
- ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+ /* Use the catalog_xmin being retained by vacuum */
+ ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
/*
* If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 735f8c5..96ea163 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
xact_desc_assignment(buf, xlrec);
}
+ else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
+ {
+ xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
+
+ appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin);
+ }
}
const char *
@@ -324,6 +330,9 @@ xact_identify(uint8 info)
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
+ case XLOG_XACT_CATALOG_XMIN_ADV:
+ id = "CATALOG_XMIN";
+ break;
}
return id;
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 5efbfbd..6cf939f 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -414,6 +414,20 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
}
}
+/*
+ * Set the global oldest catalog_xmin used to determine when tuples
+ * may be removed from catalogs and user-catalogs accessible from logical
+ * decoding.
+ *
+ * Only to be called from the startup process or by UpdateOldestCatalogXmin(),
+ * which ensures the update is properly written to xlog first.
+ */
+void
+SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
+{
+ pg_write_barrier();
+ ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin;
+}
/*
* ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c8751c6..0e3b870 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5652,6 +5652,41 @@ xact_redo(XLogReaderState *record)
ProcArrayApplyXidAssignment(xlrec->xtop,
xlrec->nsubxacts, xlrec->xsub);
}
+ else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
+ {
+ xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
+
+ /*
+ * Apply the new catalog_xmin limit immediately. New decoding sessions
+ * will refuse to start if their slot is past it, and old ones will
+ * notice when we signal them with a recovery conflict. There's no
+ * effect on the catalogs themselves yet, so it's safe for backends
+ * with older catalog_xmins to still exist.
+ *
+ * We don't have to take ProcArrayLock since only the startup process
+ * is allowed to change oldestCatalogXmin when we're in recovery.
+ *
+ * Existing sessions are not notified and must check the safe xmin.
+ */
+ SetOldestCatalogXmin(xlrec->new_catalog_xmin);
+
+ }
else
elog(PANIC, "xact_redo: unknown op code %u", info);
}
+
+/*
+ * Record when we advance the catalog_xmin used for tuple removal
+ * so standbys find out before we remove catalog tuples they might
+ * need for logical decoding.
+ */
+XLogRecPtr
+XactLogCatalogXminUpdate(TransactionId new_catalog_xmin)
+{
+ xl_xact_catalog_xmin_advance xlrec;
+ xlrec.new_catalog_xmin = new_catalog_xmin;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance);
+ return XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..a3ac2c1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5008,6 +5008,7 @@ BootStrapXLOG(void)
checkPoint.nextMultiOffset = 0;
checkPoint.oldestXid = FirstNormalTransactionId;
checkPoint.oldestXidDB = TemplateDbOid;
+ checkPoint.oldestCatalogXmin = InvalidTransactionId;
checkPoint.oldestMulti = FirstMultiXactId;
checkPoint.oldestMultiDB = TemplateDbOid;
checkPoint.oldestCommitTsXid = InvalidTransactionId;
@@ -5021,6 +5022,7 @@ BootStrapXLOG(void)
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
AdvanceOldestClogXid(checkPoint.oldestXid);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
@@ -6611,6 +6613,9 @@ StartupXLOG(void)
(errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
checkPoint.oldestXid, checkPoint.oldestXidDB)));
ereport(DEBUG1,
+ (errmsg_internal("oldest catalog-only transaction ID: %u",
+ checkPoint.oldestCatalogXmin)));
+ ereport(DEBUG1,
(errmsg_internal("oldest MultiXactId: %u, in database %u",
checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
ereport(DEBUG1,
@@ -6628,6 +6633,7 @@ StartupXLOG(void)
MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
AdvanceOldestClogXid(checkPoint.oldestXid);
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
SetCommitTsLimit(checkPoint.oldestCommitTsXid,
checkPoint.newestCommitTsXid);
@@ -8702,6 +8708,7 @@ CreateCheckPoint(int flags)
checkPoint.nextXid = ShmemVariableCache->nextXid;
checkPoint.oldestXid = ShmemVariableCache->oldestXid;
checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
+ checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
LWLockRelease(XidGenLock);
LWLockAcquire(CommitTsLock, LW_SHARED);
@@ -9631,6 +9638,7 @@ xlog_redo(XLogReaderState *record)
* redo an xl_clog_truncate if it changed since initialization.
*/
SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
/*
* If we see a shutdown checkpoint while waiting for an end-of-backup
@@ -9729,8 +9737,8 @@ xlog_redo(XLogReaderState *record)
checkPoint.oldestMultiDB);
if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
checkPoint.oldestXid))
- SetTransactionIdLimit(checkPoint.oldestXid,
- checkPoint.oldestXidDB);
+ SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+ SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 9fbb0eb..ae41dc3 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -518,6 +518,15 @@ vacuum_set_xid_limits(Relation rel,
MultiXactId safeMxactLimit;
/*
+ * When logical decoding is enabled, we must write any advance of
+ * catalog_xmin to xlog before we allow VACUUM to remove those tuples.
+ * This ensures that any standbys doing logical decoding can cancel
+ * decoding sessions and invalidate slots if we remove tuples they
+ * still need.
+ */
+ UpdateOldestCatalogXmin();
+
+ /*
* We can always ignore processes running lazy vacuum. This is because we
* use these values only for deciding which tuples we must keep in the
* tables. Since lazy vacuum doesn't write its XID anywhere, it's safe to
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index dcb4cf2..df239e0 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -51,6 +51,7 @@
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
#include "storage/spin.h"
@@ -295,6 +296,15 @@ BackgroundWriterMain(void)
}
/*
+ * Eagerly advance the catalog_xmin used by vacuum if we're not
+ * a standby. This ensures that standbys waiting for catalog_xmin
+ * confirmation receive it promptly, even if we haven't had a
+ * recent vacuum run.
+ */
+ if (!RecoveryInProgress())
+ UpdateOldestCatalogXmin();
+
+ /*
* Log a new xl_running_xacts every now and then so replication can
* get into a consistent state faster (think of suboverflowed
* snapshots) and clean up resources (locks, KnownXids*) more
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5c13d26..07a120d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -288,6 +288,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
break;
+ case XLOG_XACT_CATALOG_XMIN_ADV:
+ /*
+ * The global catalog_xmin has been advanced. By the time we see
+ * this in logical decoding it no longer matters, since it's
+ * guaranteed that all later records will be consistent with the
+ * advanced catalog_xmin, so we ignore it here. If we were running
+ * on a standby and it applied a catalog xmin advance past our
+ * needed catalog_xmin we would've already been terminated with a
+ * conflict with standby error.
+ */
+ break;
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..28d04d1 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -68,6 +68,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
+static void EnsureActiveLogicalSlotValid(void);
+
/*
* Make sure the current settings & environment are capable of doing logical
* decoding.
@@ -126,6 +128,8 @@ StartupDecodingContext(List *output_plugin_options,
/* shorter lines... */
slot = MyReplicationSlot;
+ EnsureActiveLogicalSlotValid();
+
context = AllocSetContextCreate(CurrentMemoryContext,
"Logical decoding context",
ALLOCSET_DEFAULT_SIZES);
@@ -963,3 +967,37 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex);
}
}
+
+/*
+ * Test to see if the active logical slot is usable.
+ */
+static void
+EnsureActiveLogicalSlotValid(void)
+{
+ Assert(MyReplicationSlot != NULL);
+
+ /*
+ * Currently a logical can only become unusable if we're doing logical
+ * decoding on standby and the master advanced its catalog_xmin past the
+ * threshold we need, removing tuples that we'll require to start decoding
+ * at our restart_lsn.
+ */
+ if (RecoveryInProgress())
+ {
+ /*
+ * Check if enough catalog is retained for this slot. No locking is
+ * needed here since oldestCatalogXmin can only advance, so if it's
+ * past what we need that's not going to change. We have marked our
+ * slot as active so redo won't replay past our catalog_xmin without
+ * first terminating our session.
+ */
+ TransactionId shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+ if (!TransactionIdIsValid(shmem_catalog_xmin) ||
+ TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot '%s' requires catalogs removed by master",
+ NameStr(MyReplicationSlot->data.name))));
+ }
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 771ac30..c2ad791 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1233,7 +1233,7 @@ XLogWalRcvSendHSFeedback(bool immed)
xmin = GetOldestXmin(NULL,
PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
- ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+ ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin);
if (TransactionIdIsValid(slot_xmin) &&
TransactionIdPrecedes(slot_xmin, xmin))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cfc3fba..cdc5f95 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1658,6 +1658,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
* be energy wasted - the worst lost information can do here is give us
* wrong information in a statistics view - we'll just potentially be more
* conservative in removing files.
+ *
+ * We don't have to do any effective_xmin / effective_catalog_xmin testing
+ * here either, like for LogicalConfirmReceivedLocation. If we received
+ * the xmin and catalog_xmin from downstream replication slots we know they
+ * were already confirmed there,
*/
}
@@ -1778,6 +1783,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
slot->data.xmin = feedbackXmin;
slot->effective_xmin = feedbackXmin;
}
+ /*
+ * If the physical slot is relaying catalog_xmin for logical replication
+ * slots on the replica it's safe to act on catalog_xmin advances
+ * immediately too. The replica will only send a new catalog_xmin via
+ * feedback when it advances its effective_catalog_xmin, so it's done the
+ * delay-until-confirmed dance for us and knows it won't need the data
+ * we're protecting from vacuum again.
+ */
if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
!TransactionIdIsNormal(feedbackCatalogXmin) ||
TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7c2e1e1..381c230 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -87,7 +87,11 @@ typedef struct ProcArrayStruct
/* oldest xmin of any replication slot */
TransactionId replication_slot_xmin;
- /* oldest catalog xmin of any replication slot */
+ /*
+ * Oldest catalog xmin of any replication slot
+ *
+ * See also ShmemVariableCache->oldestGlobalXmin
+ */
TransactionId replication_slot_catalog_xmin;
/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
@@ -1306,6 +1310,9 @@ TransactionIdIsActive(TransactionId xid)
* The return value is also adjusted with vacuum_defer_cleanup_age, so
* increasing that setting on the fly is another easy way to make
* GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * When changing GetOldestXmin, check to see whether RecentGlobalXmin
+ * computation in GetSnapshotData also needs changing.
*/
TransactionId
GetOldestXmin(Relation rel, int flags)
@@ -1444,6 +1451,79 @@ GetOldestXmin(Relation rel, int flags)
}
/*
+ * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated
+ * to reflect an advance in procArray->replication_slot_catalog_xmin or
+ * it becoming newly set or unset.
+ *
+ */
+static bool
+CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin)
+{
+ return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin)
+ || (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin)));
+}
+
+/*
+ * If necessary, copy the current catalog_xmin needed by replication slots to
+ * the effective catalog_xmin used for dead tuple removal and write a WAL
+ * record recording the change.
+ *
+ * This allows standbys to know the oldest xid for which it is safe to create
+ * a historic snapshot for logical decoding. VACUUM or other cleanup may have
+ * removed catalog tuple versions needed to correctly decode transactions older
+ * than this threshold. Standbys can use this information to cancel conflicting
+ * decoding sessions and invalidate slots that need discarded information.
+ *
+ * (We can't use the transaction IDs in WAL records emitted by VACUUM etc for
+ * this, since they don't identify the relation as a catalog or not. Nor can a
+ * standby look up the relcache to get the Relation for the affected
+ * relfilenode to check if it is a catalog. The standby would also have no way
+ * to know the oldest safe position at startup if it wasn't in the control
+ * file.)
+ */
+void
+UpdateOldestCatalogXmin(void)
+{
+ TransactionId vacuum_catalog_xmin;
+ TransactionId slots_catalog_xmin;
+
+ Assert(XLogInsertAllowed());
+
+ /*
+ * Do an unlocked check to see if there's a new catalog_xmin in procarray,
+ * so we can avoid taking a lock and writing xlog if they're unchanged,
+ * as is most likely.
+ *
+ * The read barrier is for oldestCatalogXmin, we don't care whether we see
+ * the very latest replication_slot_catalog_xmin or not.
+ */
+ pg_read_barrier();
+ vacuum_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+ slots_catalog_xmin = procArray->replication_slot_catalog_xmin;
+
+ if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin))
+ {
+ XactLogCatalogXminUpdate(slots_catalog_xmin);
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * A concurrent updater could've changed the oldestCatalogXmin so we
+ * need to re-check under ProcArrayLock before updating. The LWLock
+ * provides a barrier.
+ *
+ * We must not re-read replication_slot_catalog_xmin even if it has
+ * advanced, since we xlog'd the older value. A later check will
+ * advance it again.
+ */
+ vacuum_catalog_xmin = *((volatile TransactionId *) &ShmemVariableCache->oldestCatalogXmin);
+ if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin))
+ SetOldestCatalogXmin(slots_catalog_xmin);
+ LWLockRelease(ProcArrayLock);
+ }
+}
+
+/*
* GetMaxSnapshotXidCount -- get max size for snapshot XID array
*
* We have to export this for use by snapmgr.c.
@@ -1493,7 +1573,8 @@ GetMaxSnapshotSubxidCount(void)
* older than this are known not running any more.
* RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
* running transactions, except those running LAZY VACUUM). This is
- * the same computation done by GetOldestXmin(true, true).
+ * the same computation done by
+ * GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_FLAGS_VACUUM)
* RecentGlobalDataXmin: the global xmin for non-catalog tables
* >= RecentGlobalXmin
*
@@ -1700,7 +1781,7 @@ GetSnapshotData(Snapshot snapshot)
/* fetch into volatile var while ProcArrayLock is held */
replication_slot_xmin = procArray->replication_slot_xmin;
- replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+ replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
@@ -1711,6 +1792,9 @@ GetSnapshotData(Snapshot snapshot)
* Update globalxmin to include actual process xids. This is a slightly
* different way of computing it than GetOldestXmin uses, but should give
* the same result.
+ *
+ * If you change computation of RecentGlobalXmin here you may need to
+ * change GetOldestXmin(...) as well.
*/
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
@@ -2168,14 +2252,14 @@ GetOldestSafeDecodingTransactionId(void)
oldestSafeXid = ShmemVariableCache->nextXid;
/*
- * If there's already a slot pegging the xmin horizon, we can start with
- * that value, it's guaranteed to be safe since it's computed by this
- * routine initially and has been enforced since.
+ * If there's already an effectiveCatalogXmin held down by an existing
+ * replication slot it's definitely safe to start there, and it can't
+ * advance while we hold ProcArrayLock.
*/
- if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
- TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
+ if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) &&
+ TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin,
oldestSafeXid))
- oldestSafeXid = procArray->replication_slot_catalog_xmin;
+ oldestSafeXid = ShmemVariableCache->oldestCatalogXmin;
/*
* If we're not in recovery, we walk over the procarray and collect the
@@ -2965,18 +3049,29 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
*
* Return the current slot xmin limits. That's useful to be able to remove
* data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmin, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs.
*/
void
ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
- TransactionId *catalog_xmin)
+ TransactionId *retained_catalog_xmin,
+ TransactionId *needed_catalog_xmin)
{
LWLockAcquire(ProcArrayLock, LW_SHARED);
if (xmin != NULL)
*xmin = procArray->replication_slot_xmin;
- if (catalog_xmin != NULL)
- *catalog_xmin = procArray->replication_slot_catalog_xmin;
+ if (retained_catalog_xmin != NULL)
+ *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+ if (needed_catalog_xmin != NULL)
+ *needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
LWLockRelease(ProcArrayLock);
}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..5c7eb77 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -248,6 +248,8 @@ main(int argc, char *argv[])
ControlFile->checkPointCopy.oldestCommitTsXid);
printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
ControlFile->checkPointCopy.newestCommitTsXid);
+ printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+ ControlFile->checkPointCopy.oldestCatalogXmin);
printf(_("Time of latest checkpoint: %s\n"),
ckpttime_str);
printf(_("Fake LSN counter for unlogged rels: %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index d25a2dd..fe5e67c 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -134,6 +134,11 @@ typedef struct VariableCacheData
*/
TransactionId latestCompletedXid; /* newest XID that has committed or
* aborted */
+ TransactionId oldestCatalogXmin; /* oldest xid it is guaranteed to be safe
+ * to create a historic snapshot for; see
+ * also
+ * procArray->replication_slot_catalog_xmin
+ * */
/*
* These fields are protected by CLogTruncationLock
@@ -180,6 +185,7 @@ extern TransactionId ReadNewTransactionId(void);
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
Oid oldest_datoid);
extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
+extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin);
extern bool ForceTransactionIdLimitUpdate(void);
extern Oid GetNewObjectId(void);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5b37c05..6d18d18 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -137,7 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_CATALOG_XMIN_ADV 0x60
/* free opcode 0x70 */
/* mask for filtering opcodes out of xl_info */
@@ -187,6 +187,13 @@ typedef struct xl_xact_assignment
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
+typedef struct xl_xact_catalog_xmin_advance
+{
+ TransactionId new_catalog_xmin;
+} xl_xact_catalog_xmin_advance;
+
+#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId))
+
/*
* Commit and abort records can contain a lot of information. But a large
* portion of the records won't need all possible pieces of information. So we
@@ -391,6 +398,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
int xactflags, TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin);
+
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..1fe89ae 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -45,6 +45,7 @@ typedef struct CheckPoint
MultiXactOffset nextMultiOffset; /* next free MultiXact offset */
TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */
Oid oldestXidDB; /* database with minimum datfrozenxid */
+ TransactionId oldestCatalogXmin; /* catalog retained after this xid */
MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */
Oid oldestMultiDB; /* database with minimum datminmxid */
pg_time_t time; /* time stamp of checkpoint */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49..69a82d7 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -120,6 +120,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
TransactionId catalog_xmin, bool already_locked);
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
- TransactionId *catalog_xmin);
+ TransactionId *retained_catalog_xmin,
+ TransactionId *needed_catalog_xmin);
+
+extern void UpdateOldestCatalogXmin(void);
#endif /* PROCARRAY_H */
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index bf9b50a..f38b38a 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 25;
# Initialize master node
my $node_master = get_new_node('master');
@@ -17,6 +17,10 @@ $node_master->append_conf(
wal_level = logical
));
$node_master->start;
+
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+ "pg_controldata's oldestCatalogXmin is zero after start");
+
my $backup_name = 'master_backup';
$node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
@@ -96,9 +100,15 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
'restored slot catalog_xmin is nonzero');
is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
'reading from slot with wal_level < logical fails');
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+ "pg_controldata's oldestCatalogXmin is nonzero");
is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
'can drop logical slot while wal_level = replica');
is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+$node_master->safe_psql('postgres', 'VACUUM;');
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+ "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint");
# done with the node
$node_master->stop;
--
2.5.5
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers