Rebased onto current master (fb544735f1).
--
Andrey Lepikhov
Postgres Professional
https://postgrespro.com
The Russian Postgres Company
>From 29183c42a8ae31b830ab5af0dfcfdaadd6229700 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <[email protected]>
Date: Tue, 12 May 2020 08:29:54 +0500
Subject: [PATCH 1/3] GlobalCSNLog-SLRU-v3
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/global_csn_log.c | 439 ++++++++++++++++++++
src/backend/access/transam/twophase.c | 1 +
src/backend/access/transam/varsup.c | 2 +
src/backend/access/transam/xlog.c | 12 +
src/backend/storage/ipc/ipci.c | 3 +
src/backend/storage/ipc/procarray.c | 3 +
src/backend/storage/lmgr/lwlocknames.txt | 1 +
src/backend/tcop/postgres.c | 1 +
src/backend/utils/misc/guc.c | 9 +
src/backend/utils/probes.d | 2 +
src/bin/initdb/initdb.c | 3 +-
src/include/access/global_csn_log.h | 30 ++
src/include/storage/lwlock.h | 1 +
src/include/utils/snapshot.h | 3 +
15 files changed, 510 insertions(+), 1 deletion(-)
create mode 100644 src/backend/access/transam/global_csn_log.c
create mode 100644 src/include/access/global_csn_log.h
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..60ff8b141e 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
clog.o \
commit_ts.o \
+ global_csn_log.o \
generic_xlog.o \
multixact.o \
parallel.o \
diff --git a/src/backend/access/transam/global_csn_log.c b/src/backend/access/transam/global_csn_log.c
new file mode 100644
index 0000000000..6f7fded350
--- /dev/null
+++ b/src/backend/access/transam/global_csn_log.c
@@ -0,0 +1,439 @@
+/*-----------------------------------------------------------------------------
+ *
+ * global_csn_log.c
+ * Track global commit sequence numbers of finished transactions
+ *
+ * Implementation of cross-node transaction isolation relies on commit sequence
+ * number (CSN) based visibility rules. This module provides SLRU to store
+ * CSN for each transaction. This mapping need to be kept only for xid's
+ * greater then oldestXid, but that can require arbitrary large amounts of
+ * memory in case of long-lived transactions. Because of same lifetime and
+ * persistancy requirements this module is quite similar to subtrans.c
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/snapmgr.h"
+
+bool track_global_snapshots;
+
+/*
+ * Defines for GlobalCSNLog page sizes. A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * GlobalCSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/GLOBAL_CSN_LOG_XACTS_PER_PAGE, and GlobalCSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateGlobalCSNLog (see GlobalCSNLogPagePrecedes).
+ */
+
+/* We store the commit GlobalCSN for each xid */
+#define GCSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(GlobalCSN))
+
+#define TransactionIdToPage(xid) ((xid) / (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData GlobalCSNLogCtlData;
+#define GlobalCsnlogCtl (&GlobalCSNLogCtlData)
+
+static int ZeroGlobalCSNLogPage(int pageno);
+static bool GlobalCSNLogPagePrecedes(int page1, int page2);
+static void GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids,
+ GlobalCSN csn, int pageno);
+static void GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn,
+ int slotno);
+
+/*
+ * GlobalCSNLogSetCSN
+ *
+ * Record GlobalCSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedGlobalCSN for abort cases.
+ */
+void
+GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+ TransactionId *subxids, GlobalCSN csn)
+{
+ int pageno;
+ int i = 0;
+ int offset = 0;
+
+ /* Callers of GlobalCSNLogSetCSN() must check GUC params */
+ Assert(track_global_snapshots);
+
+ Assert(TransactionIdIsValid(xid));
+
+ pageno = TransactionIdToPage(xid); /* get page of parent */
+ for (;;)
+ {
+ int num_on_page = 0;
+
+ while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+ {
+ num_on_page++;
+ i++;
+ }
+
+ GlobalCSNLogSetPageStatus(xid,
+ num_on_page, subxids + offset,
+ csn, pageno);
+ if (i >= nsubxids)
+ break;
+
+ offset = i;
+ pageno = TransactionIdToPage(subxids[offset]);
+ xid = InvalidTransactionId;
+ }
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page. Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids,
+ GlobalCSN csn, int pageno)
+{
+ int slotno;
+ int i;
+
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ slotno = SimpleLruReadPage(GlobalCsnlogCtl, pageno, true, xid);
+
+ /* Subtransactions first, if needed ... */
+ for (i = 0; i < nsubxids; i++)
+ {
+ Assert(GlobalCsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+ GlobalCSNLogSetCSNInSlot(subxids[i], csn, slotno);
+ }
+
+ /* ... then the main transaction */
+ if (TransactionIdIsValid(xid))
+ GlobalCSNLogSetCSNInSlot(xid, csn, slotno);
+
+ GlobalCsnlogCtl->shared->page_dirty[slotno] = true;
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn, int slotno)
+{
+ int entryno = TransactionIdToPgIndex(xid);
+ GlobalCSN *ptr;
+
+ Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+
+ ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+
+ *ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetGlobalCSN() in global_snapshot.c is the
+ * intended caller.
+ */
+GlobalCSN
+GlobalCSNLogGetCSN(TransactionId xid)
+{
+ int pageno = TransactionIdToPage(xid);
+ int entryno = TransactionIdToPgIndex(xid);
+ int slotno;
+ GlobalCSN *ptr;
+ GlobalCSN global_csn;
+
+ /* Callers of GlobalCSNLogGetCSN() must check GUC params */
+ Assert(track_global_snapshots);
+
+ /* Can't ask about stuff that might not be around anymore */
+ Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+ /* lock is acquired by SimpleLruReadPage_ReadOnly */
+
+ slotno = SimpleLruReadPage_ReadOnly(GlobalCsnlogCtl, pageno, xid);
+ ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+ global_csn = *ptr;
+
+ LWLockRelease(GlobalCSNLogControlLock);
+
+ return global_csn;
+}
+
+/*
+ * Number of shared GlobalCSNLog buffers.
+ */
+static Size
+GlobalCSNLogShmemBuffers(void)
+{
+ return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for GlobalCsnlogCtl.
+ */
+Size
+GlobalCSNLogShmemSize(void)
+{
+ if (!track_global_snapshots)
+ return 0;
+
+ return SimpleLruShmemSize(GlobalCSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for GlobalCSNLog.
+ */
+void
+GlobalCSNLogShmemInit(void)
+{
+ if (!track_global_snapshots)
+ return;
+
+ GlobalCsnlogCtl->PagePrecedes = GlobalCSNLogPagePrecedes;
+ SimpleLruInit(GlobalCsnlogCtl, "GlobalCSNLog Ctl", GlobalCSNLogShmemBuffers(), 0,
+ GlobalCSNLogControlLock, "pg_global_csn", LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS);
+}
+
+/*
+ * This func must be called ONCE on system install. It creates the initial
+ * GlobalCSNLog segment. The pg_global_csn directory is assumed to have been
+ * created by initdb, and GlobalCSNLogShmemInit must have been called already.
+ */
+void
+BootStrapGlobalCSNLog(void)
+{
+ int slotno;
+
+ if (!track_global_snapshots)
+ return;
+
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ /* Create and zero the first page of the commit log */
+ slotno = ZeroGlobalCSNLogPage(0);
+
+ /* Make sure it's written out */
+ SimpleLruWritePage(GlobalCsnlogCtl, slotno);
+ Assert(!GlobalCsnlogCtl->shared->page_dirty[slotno]);
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of GlobalCSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroGlobalCSNLogPage(int pageno)
+{
+ Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+ return SimpleLruZeroPage(GlobalCsnlogCtl, pageno);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
+ */
+void
+StartupGlobalCSNLog(TransactionId oldestActiveXID)
+{
+ int startPage;
+ int endPage;
+
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * Since we don't expect pg_global_csn to be valid across crashes, we
+ * initialize the currently-active page(s) to zeroes during startup.
+ * Whenever we advance into a new page, ExtendGlobalCSNLog will likewise
+ * zero the new page without regard to whatever was previously on disk.
+ */
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ startPage = TransactionIdToPage(oldestActiveXID);
+ endPage = TransactionIdToPage(XidFromFullTransactionId(ShmemVariableCache->nextFullXid));
+
+ while (startPage != endPage)
+ {
+ (void) ZeroGlobalCSNLogPage(startPage);
+ startPage++;
+ /* must account for wraparound */
+ if (startPage > TransactionIdToPage(MaxTransactionId))
+ startPage = 0;
+ }
+ (void) ZeroGlobalCSNLogPage(startPage);
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownGlobalCSNLog(void)
+{
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * Flush dirty GlobalCSNLog pages to disk.
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely as a debugging aid.
+ */
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(false);
+ SimpleLruFlush(GlobalCsnlogCtl, false);
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointGlobalCSNLog(void)
+{
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * Flush dirty GlobalCSNLog pages to disk.
+ *
+ * This is not actually necessary from a correctness point of view. We do
+ * it merely to improve the odds that writing of dirty pages is done by
+ * the checkpoint process and not by backends.
+ */
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(true);
+ SimpleLruFlush(GlobalCsnlogCtl, true);
+ TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that GlobalCSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock. We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendGlobalCSNLog(TransactionId newestXact)
+{
+ int pageno;
+
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * No work except at first XID of a page. But beware: just after
+ * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ */
+ if (TransactionIdToPgIndex(newestXact) != 0 &&
+ !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ return;
+
+ pageno = TransactionIdToPage(newestXact);
+
+ LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+ /* Zero the page and make an XLOG entry about it */
+ ZeroGlobalCSNLogPage(pageno);
+
+ LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Remove all GlobalCSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateGlobalCSNLog(TransactionId oldestXact)
+{
+ int cutoffPage;
+
+ if (!track_global_snapshots)
+ return;
+
+ /*
+ * The cutoff point is the start of the segment containing oldestXact. We
+ * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+ * back one transaction to avoid passing a cutoff page that hasn't been
+ * created yet in the rare case that oldestXact would be the first item on
+ * a page and oldestXact == next XID. In that case, if we didn't subtract
+ * one, we'd trigger SimpleLruTruncate's wraparound detection.
+ */
+ TransactionIdRetreat(oldestXact);
+ cutoffPage = TransactionIdToPage(oldestXact);
+
+ SimpleLruTruncate(GlobalCsnlogCtl, cutoffPage);
+}
+
+/*
+ * Decide which of two GlobalCSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic. However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+GlobalCSNLogPagePrecedes(int page1, int page2)
+{
+ TransactionId xid1;
+ TransactionId xid2;
+
+ xid1 = ((TransactionId) page1) * GCSNLOG_XACTS_PER_PAGE;
+ xid1 += FirstNormalTransactionId;
+ xid2 = ((TransactionId) page2) * GCSNLOG_XACTS_PER_PAGE;
+ xid2 += FirstNormalTransactionId;
+
+ return TransactionIdPrecedes(xid1, xid2);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a..0ecc02a3dd 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/htup_details.h"
#include "access/subtrans.h"
#include "access/transam.h"
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 2570e7086a..882bc66825 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -173,6 +174,7 @@ GetNewTransactionId(bool isSubXact)
* Extend pg_subtrans and pg_commit_ts too.
*/
ExtendCLOG(xid);
+ ExtendGlobalCSNLog(xid);
ExtendCommitTs(xid);
ExtendSUBTRANS(xid);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0d3d670928..285d9d442e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/heaptoast.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
@@ -5345,6 +5346,7 @@ BootStrapXLOG(void)
/* Bootstrap the commit log, too */
BootStrapCLOG();
+ BootStrapGlobalCSNLog();
BootStrapCommitTs();
BootStrapSUBTRANS();
BootStrapMultiXact();
@@ -7054,6 +7056,7 @@ StartupXLOG(void)
* maintained during recovery and need not be started yet.
*/
StartupCLOG();
+ StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
/*
@@ -7871,6 +7874,7 @@ StartupXLOG(void)
if (standbyState == STANDBY_DISABLED)
{
StartupCLOG();
+ StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
}
@@ -8518,6 +8522,7 @@ ShutdownXLOG(int code, Datum arg)
CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
}
ShutdownCLOG();
+ ShutdownGlobalCSNLog();
ShutdownCommitTs();
ShutdownSUBTRANS();
ShutdownMultiXact();
@@ -9090,7 +9095,10 @@ CreateCheckPoint(int flags)
* StartupSUBTRANS hasn't been called yet.
*/
if (!RecoveryInProgress())
+ {
TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ }
/* Real work is done, but log and update stats before releasing lock. */
LogCheckpointEnd(false);
@@ -9166,6 +9174,7 @@ static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointCLOG();
+ CheckPointGlobalCSNLog();
CheckPointCommitTs();
CheckPointSUBTRANS();
CheckPointMultiXact();
@@ -9450,7 +9459,10 @@ CreateRestartPoint(int flags)
* this because StartupSUBTRANS hasn't been called yet.
*/
if (EnableHotStandby)
+ {
TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+ }
/* Real work is done, but log and update before releasing lock. */
LogCheckpointEnd(true);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..dc2d2959c4 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/global_csn_log.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/nbtree.h"
@@ -125,6 +126,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
+ size = add_size(size, GlobalCSNLogShmemSize());
size = add_size(size, CommitTsShmemSize());
size = add_size(size, SUBTRANSShmemSize());
size = add_size(size, TwoPhaseShmemSize());
@@ -213,6 +215,7 @@ CreateSharedMemoryAndSemaphores(void)
*/
XLOGShmemInit();
CLOGShmemInit();
+ GlobalCSNLogShmemInit();
CommitTsShmemInit();
SUBTRANSShmemInit();
MultiXactShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 363000670b..8ae4906474 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -46,6 +46,7 @@
#include <signal.h>
#include "access/clog.h"
+#include "access/global_csn_log.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -835,6 +836,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
{
ExtendSUBTRANS(latestObservedXid);
+ ExtendGlobalCSNLog(latestObservedXid);
TransactionIdAdvance(latestObservedXid);
}
TransactionIdRetreat(latestObservedXid); /* = running->nextXid - 1 */
@@ -3337,6 +3339,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
while (TransactionIdPrecedes(next_expected_xid, xid))
{
TransactionIdAdvance(next_expected_xid);
+ ExtendGlobalCSNLog(next_expected_xid);
ExtendSUBTRANS(next_expected_xid);
}
Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index db47843229..fe18c93b61 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -49,3 +49,4 @@ MultiXactTruncationLock 41
OldSnapshotTimeMapLock 42
LogicalRepWorkerLock 43
CLogTruncationLock 44
+GlobalCSNLogControlLock 45
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 802b1ec22f..d0bb64870c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -42,6 +42,7 @@
#include "catalog/pg_type.h"
#include "commands/async.h"
#include "commands/prepare.h"
+#include "common/hashfn.h"
#include "executor/spi.h"
#include "jit/jit.h"
#include "libpq/libpq.h"
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 53a6cd2436..0ca331c6f9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1175,6 +1175,15 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
+ {
+ {"track_global_snapshots", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Enable global snapshot tracking."),
+ gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
+ },
+ &track_global_snapshots,
+ true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+ NULL, NULL, NULL
+ },
{
{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
gettext_noop("Enables SSL connections."),
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index a0b0458108..f900e7f3b4 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
probe clog__checkpoint__done(bool);
probe subtrans__checkpoint__start(bool);
probe subtrans__checkpoint__done(bool);
+ probe globalcsnlog__checkpoint__start(bool);
+ probe globalcsnlog__checkpoint__done(bool);
probe multixact__checkpoint__start(bool);
probe multixact__checkpoint__done(bool);
probe twophase__checkpoint__start();
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index a6577486ce..d0afab9d33 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -220,7 +220,8 @@ static const char *const subdirs[] = {
"pg_xact",
"pg_logical",
"pg_logical/snapshots",
- "pg_logical/mappings"
+ "pg_logical/mappings",
+ "pg_global_csn"
};
diff --git a/src/include/access/global_csn_log.h b/src/include/access/global_csn_log.h
new file mode 100644
index 0000000000..417c26c8a3
--- /dev/null
+++ b/src/include/access/global_csn_log.h
@@ -0,0 +1,30 @@
+/*
+ * global_csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+
+extern void GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+ TransactionId *subxids, GlobalCSN csn);
+extern GlobalCSN GlobalCSNLogGetCSN(TransactionId xid);
+
+extern Size GlobalCSNLogShmemSize(void);
+extern void GlobalCSNLogShmemInit(void);
+extern void BootStrapGlobalCSNLog(void);
+extern void StartupGlobalCSNLog(TransactionId oldestActiveXID);
+extern void ShutdownGlobalCSNLog(void);
+extern void CheckPointGlobalCSNLog(void);
+extern void ExtendGlobalCSNLog(TransactionId newestXact);
+extern void TruncateGlobalCSNLog(TransactionId oldestXact);
+
+#endif /* CSNLOG_H */
\ No newline at end of file
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 8fda8e4f78..c303042663 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -198,6 +198,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_CLOG_BUFFERS = NUM_INDIVIDUAL_LWLOCKS,
LWTRANCHE_COMMITTS_BUFFERS,
LWTRANCHE_SUBTRANS_BUFFERS,
+ LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS,
LWTRANCHE_MXACTOFFSET_BUFFERS,
LWTRANCHE_MXACTMEMBER_BUFFERS,
LWTRANCHE_ASYNC_BUFFERS,
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 4796edb63a..57d2dfaa67 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -20,6 +20,9 @@
#include "storage/buf.h"
+typedef uint64 GlobalCSN;
+extern bool track_global_snapshots;
+
/*
* The different snapshot types. We use SnapshotData structures to represent
* both "regular" (MVCC) snapshots and "special" snapshots that have non-MVCC
--
2.17.1
>From 25a5288764e9e70a0a61a4a1b32111ce8b29c966 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <[email protected]>
Date: Tue, 12 May 2020 08:30:46 +0500
Subject: [PATCH 2/3] Global-snapshots-v3
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/global_snapshot.c | 755 ++++++++++++++++++
src/backend/access/transam/twophase.c | 156 ++++
src/backend/access/transam/xact.c | 29 +
src/backend/access/transam/xlog.c | 2 +
src/backend/storage/ipc/ipci.c | 3 +
src/backend/storage/ipc/procarray.c | 92 ++-
src/backend/storage/lmgr/lwlocknames.txt | 1 +
src/backend/storage/lmgr/proc.c | 5 +
src/backend/utils/misc/guc.c | 13 +-
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/backend/utils/time/snapmgr.c | 167 +++-
src/include/access/global_snapshot.h | 72 ++
src/include/access/twophase.h | 1 +
src/include/catalog/pg_proc.dat | 14 +
src/include/datatype/timestamp.h | 3 +
src/include/fmgr.h | 1 +
src/include/portability/instr_time.h | 10 +
src/include/storage/proc.h | 15 +
src/include/storage/procarray.h | 8 +
src/include/utils/snapmgr.h | 3 +
src/include/utils/snapshot.h | 8 +
22 files changed, 1354 insertions(+), 7 deletions(-)
create mode 100644 src/backend/access/transam/global_snapshot.c
create mode 100644 src/include/access/global_snapshot.h
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 60ff8b141e..6de567a79b 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -16,6 +16,7 @@ OBJS = \
clog.o \
commit_ts.o \
global_csn_log.o \
+ global_snapshot.o \
generic_xlog.o \
multixact.o \
parallel.o \
diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c
new file mode 100644
index 0000000000..bac16828bb
--- /dev/null
+++ b/src/backend/access/transam/global_snapshot.c
@@ -0,0 +1,755 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.c
+ * Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported global_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+/*
+ * GlobalSnapshotState
+ *
+ * Do not trust local clocks to be strictly monotonical and save last acquired
+ * value so later we can compare next timestamp with it. Accessed through
+ * GlobalSnapshotGenerate() and GlobalSnapshotSync().
+ */
+typedef struct
+{
+ GlobalCSN last_global_csn;
+ volatile slock_t lock;
+} GlobalSnapshotState;
+
+static GlobalSnapshotState *gsState;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size GlobalSnapshotXidMap circular buffer.
+ */
+int global_snapshot_defer_time;
+
+/*
+ * Enables this module.
+ */
+extern bool track_global_snapshots;
+
+/*
+ * GlobalSnapshotXidMap
+ *
+ * To be able to install global snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid. Here we
+ * keep track of correspondence between snapshot's global_csn and oldestXid
+ * that was set at the time when the snapshot was taken. Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest global_csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node) but here implemented one that tries to avoid cross-node
+ * communications which are tricky in case of postgres_fdw.
+ *
+ * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores
+ * correspondence between current global_csn and oldestXmin in a sparse way:
+ * global_csn is rounded to seconds (and here we use the fact that global_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded global_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by global_snapshot_defer_time GUC.
+ *
+ * When global snapshot arrives from different node we check that its
+ * global_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message. If global_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid. That way GetOldestXmin() can take into account backends
+ * with imported global snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * global snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import. Otherwise, we can create a feedback loop:
+ * xmin's of imported global snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin. All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct GlobalSnapshotXidMap
+{
+ int head; /* offset of current freshest value */
+ int size; /* total size of circular buffer */
+ GlobalCSN_atomic last_csn_seconds; /* last rounded global_csn that changed
+ * xmin_by_second[] */
+ TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */
+}
+GlobalSnapshotXidMap;
+
+static GlobalSnapshotXidMap *gsXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+GlobalSnapshotShmemSize(void)
+{
+ Size size = 0;
+
+ if (track_global_snapshots || global_snapshot_defer_time > 0)
+ {
+ size += MAXALIGN(sizeof(GlobalSnapshotState));
+ }
+
+ if (global_snapshot_defer_time > 0)
+ {
+ size += sizeof(GlobalSnapshotXidMap);
+ size += global_snapshot_defer_time*sizeof(TransactionId);
+ size = MAXALIGN(size);
+ }
+
+ return size;
+}
+
+/* Init shared memory structures */
+void
+GlobalSnapshotShmemInit()
+{
+ bool found;
+
+ if (track_global_snapshots || global_snapshot_defer_time > 0)
+ {
+ gsState = ShmemInitStruct("gsState",
+ sizeof(GlobalSnapshotState),
+ &found);
+ if (!found)
+ {
+ gsState->last_global_csn = 0;
+ SpinLockInit(&gsState->lock);
+ }
+ }
+
+ if (global_snapshot_defer_time > 0)
+ {
+ gsXidMap = ShmemInitStruct("gsXidMap",
+ sizeof(GlobalSnapshotXidMap),
+ &found);
+ if (!found)
+ {
+ int i;
+
+ pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0);
+ gsXidMap->head = 0;
+ gsXidMap->size = global_snapshot_defer_time;
+ gsXidMap->xmin_by_second =
+ ShmemAlloc(sizeof(TransactionId)*gsXidMap->size);
+
+ for (i = 0; i < gsXidMap->size; i++)
+ gsXidMap->xmin_by_second[i] = InvalidTransactionId;
+ }
+ }
+}
+
+/*
+ * GlobalSnapshotStartup
+ *
+ * Set gsXidMap entries to oldestActiveXID during startup.
+ */
+void
+GlobalSnapshotStartup(TransactionId oldestActiveXID)
+{
+ /*
+ * Run only if we have initialized shared memory and gsXidMap
+ * is enabled.
+ */
+ if (IsNormalProcessingMode() &&
+ track_global_snapshots && global_snapshot_defer_time > 0)
+ {
+ int i;
+
+ Assert(TransactionIdIsValid(oldestActiveXID));
+ for (i = 0; i < gsXidMap->size; i++)
+ gsXidMap->xmin_by_second[i] = oldestActiveXID;
+ ProcArraySetGlobalSnapshotXmin(oldestActiveXID);
+ }
+}
+
+/*
+ * GlobalSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * global transaction. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got global_csn called GlobalSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * GlobalSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ * * We already hold our xmin in MyPgXact, so our snapshot will not be
+ * harmed even though ProcArrayLock is released.
+ *
+ * * snapshot_global_csn is always pessmistically rounded up to the next
+ * second.
+ *
+ * * For performance reasons, xmin value for particular second is filled
+ * only once. Because of that instead of writing to buffer just our
+ * xmin (which is enough for our snapshot), we bump oldestXmin there --
+ * it mitigates the possibility of damaging someone else's snapshot by
+ * writing to the buffer too advanced value in case of slowness of
+ * another backend who generated csn earlier, but didn't manage to
+ * insert it before us.
+ *
+ * * if GlobalSnapshotMapXmin() founds a gap in several seconds between
+ * current call and latest completed call then it should fill that gap
+ * with latest known values instead of new one. Otherwise it is
+ * possible (however highly unlikely) that this gap also happend
+ * between taking snapshot and call to GlobalSnapshotMapXmin() for some
+ * backend. And we are at risk to fill circullar buffer with
+ * oldestXmin's that are bigger then they actually were.
+ */
+void
+GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn)
+{
+ int offset, gap, i;
+ GlobalCSN csn_seconds;
+ GlobalCSN last_csn_seconds;
+ volatile TransactionId oldest_deferred_xmin;
+ TransactionId current_oldest_xmin, previous_oldest_xmin;
+
+ /* Callers should check config values */
+ Assert(global_snapshot_defer_time > 0);
+ Assert(gsXidMap != NULL);
+
+ /*
+ * Round up global_csn to the next second -- pessimistically and safely.
+ */
+ csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1);
+
+ /*
+ * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock
+ * if oldestXid was already written to xmin_by_second[] for this rounded
+ * global_csn.
+ */
+ if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds)
+ return;
+
+ /* Ok, we have new entry (or entries) */
+ LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE);
+
+ /* Re-check last_csn_seconds under lock */
+ last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+ if (last_csn_seconds >= csn_seconds)
+ {
+ LWLockRelease(GlobalSnapshotXidMapLock);
+ return;
+ }
+ pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds);
+
+ /*
+ * Count oldest_xmin.
+ *
+ * It was possible to calculate oldest_xmin during corresponding snapshot
+ * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+ * PgProc. And we need info about originalXmin (see comment to gsXidMap)
+ * which is stored in PgProc because of threats in comments around PgXact
+ * about extending it with new fields. So just calculate oldest_xmin again,
+ * that anyway happens quite rarely.
+ */
+ current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN);
+
+ previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+
+ Assert(TransactionIdIsNormal(current_oldest_xmin));
+ Assert(TransactionIdIsNormal(previous_oldest_xmin) || !track_global_snapshots);
+
+ gap = csn_seconds - last_csn_seconds;
+ offset = csn_seconds % gsXidMap->size;
+
+ /* Sanity check before we update head and gap */
+ Assert( gap >= 1 );
+ Assert( (gsXidMap->head + gap) % gsXidMap->size == offset );
+
+ gap = gap > gsXidMap->size ? gsXidMap->size : gap;
+ gsXidMap->head = offset;
+
+ /* Fill new entry with current_oldest_xmin */
+ gsXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+ /*
+ * If we have gap then fill it with previous_oldest_xmin for reasons
+ * outlined in comment above this function.
+ */
+ for (i = 1; i < gap; i++)
+ {
+ offset = (offset + gsXidMap->size - 1) % gsXidMap->size;
+ gsXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+ }
+
+ oldest_deferred_xmin =
+ gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ];
+
+ LWLockRelease(GlobalSnapshotXidMapLock);
+
+ /*
+ * Advance procArray->global_snapshot_xmin after we released
+ * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+ * never goes backwards regardless of how slow we can do that.
+ */
+ Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+ ProcArrayGetGlobalSnapshotXmin()));
+ ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * GlobalSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_global_csn was taken.
+ */
+TransactionId
+GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn)
+{
+ TransactionId xmin;
+ GlobalCSN csn_seconds;
+ volatile GlobalCSN last_csn_seconds;
+
+ /* Callers should check config values */
+ Assert(global_snapshot_defer_time > 0);
+ Assert(gsXidMap != NULL);
+
+ /* Round down to get conservative estimates */
+ csn_seconds = (snapshot_global_csn / NSECS_PER_SEC);
+
+ LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED);
+ last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+ if (csn_seconds > last_csn_seconds)
+ {
+ /* we don't have entry for this global_csn yet, return latest known */
+ xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+ }
+ else if (last_csn_seconds - csn_seconds < gsXidMap->size)
+ {
+ /* we are good, retrieve value from our map */
+ Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head);
+ xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size];
+ }
+ else
+ {
+ /* requested global_csn is too old, let caller know */
+ xmin = InvalidTransactionId;
+ }
+ LWLockRelease(GlobalSnapshotXidMapLock);
+
+ return xmin;
+}
+
+/*
+ * GlobalSnapshotGenerate
+ *
+ * Generate GlobalCSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+GlobalCSN
+GlobalSnapshotGenerate(bool locked)
+{
+ instr_time current_time;
+ GlobalCSN global_csn;
+
+ Assert(track_global_snapshots || global_snapshot_defer_time > 0);
+
+ /*
+ * TODO: create some macro that add small random shift to current time.
+ */
+ INSTR_TIME_SET_CURRENT(current_time);
+ global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time);
+
+ /* TODO: change to atomics? */
+ if (!locked)
+ SpinLockAcquire(&gsState->lock);
+
+ if (global_csn <= gsState->last_global_csn)
+ global_csn = ++gsState->last_global_csn;
+ else
+ gsState->last_global_csn = global_csn;
+
+ if (!locked)
+ SpinLockRelease(&gsState->lock);
+
+ return global_csn;
+}
+
+/*
+ * GlobalSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive global_csn
+ * which is greater than global_csn on this node. To preserve proper isolation
+ * this node needs to wait when such global_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+GlobalSnapshotSync(GlobalCSN remote_gcsn)
+{
+ GlobalCSN local_gcsn;
+ GlobalCSN delta;
+
+ Assert(track_global_snapshots);
+
+ for(;;)
+ {
+ SpinLockAcquire(&gsState->lock);
+ if (gsState->last_global_csn > remote_gcsn)
+ {
+ /* Everything is fine */
+ SpinLockRelease(&gsState->lock);
+ return;
+ }
+ else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn)
+ {
+ /*
+ * Everything is fine too, but last_global_csn wasn't updated for
+ * some time.
+ */
+ SpinLockRelease(&gsState->lock);
+ return;
+ }
+ SpinLockRelease(&gsState->lock);
+
+ /* Okay we need to sleep now */
+ delta = remote_gcsn - local_gcsn;
+ if (delta > SNAP_DESYNC_COMPLAIN)
+ ereport(WARNING,
+ (errmsg("remote global snapshot exceeds ours by more than a second"),
+ errhint("Consider running NTPd on servers participating in global transaction")));
+
+ /* TODO: report this sleeptime somewhere? */
+ pg_usleep((long) (delta/NSECS_PER_USEC));
+
+ /*
+ * Loop that checks to ensure that we actually slept for specified
+ * amount of time.
+ */
+ }
+
+ Assert(false); /* Should not happend */
+ return;
+}
+
+/*
+ * TransactionIdGetGlobalCSN
+ *
+ * Get GlobalCSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+GlobalCSN
+TransactionIdGetGlobalCSN(TransactionId xid)
+{
+ GlobalCSN global_csn;
+
+ Assert(track_global_snapshots);
+
+ /* Handle permanent TransactionId's for which we don't have mapping */
+ if (!TransactionIdIsNormal(xid))
+ {
+ if (xid == InvalidTransactionId)
+ return AbortedGlobalCSN;
+ if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+ return FrozenGlobalCSN;
+ Assert(false); /* Should not happend */
+ }
+
+ /*
+ * For xids which less then TransactionXmin GlobalCSNLog can be already
+ * trimmed but we know that such transaction is definetly not concurrently
+ * running according to any snapshot including timetravel ones. Callers
+ * should check TransactionDidCommit after.
+ */
+ if (TransactionIdPrecedes(xid, TransactionXmin))
+ return FrozenGlobalCSN;
+
+ /* Read GlobalCSN from SLRU */
+ global_csn = GlobalCSNLogGetCSN(xid);
+
+ /*
+ * If we faced InDoubt state then transaction is beeing committed and we
+ * should wait until GlobalCSN will be assigned so that visibility check
+ * could decide whether tuple is in snapshot. See also comments in
+ * GlobalSnapshotPrecommit().
+ */
+ if (GlobalCSNIsInDoubt(global_csn))
+ {
+ XactLockTableWait(xid, NULL, NULL, XLTW_None);
+ global_csn = GlobalCSNLogGetCSN(xid);
+ Assert(GlobalCSNIsNormal(global_csn) ||
+ GlobalCSNIsAborted(global_csn));
+ }
+
+ Assert(GlobalCSNIsNormal(global_csn) ||
+ GlobalCSNIsInProgress(global_csn) ||
+ GlobalCSNIsAborted(global_csn));
+
+ return global_csn;
+}
+
+/*
+ * XidInvisibleInGlobalSnapshot
+ *
+ * Version of XidInMVCCSnapshot for global transactions. For non-imported
+ * global snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ GlobalCSN csn;
+
+ Assert(track_global_snapshots);
+
+ csn = TransactionIdGetGlobalCSN(xid);
+
+ if (GlobalCSNIsNormal(csn))
+ {
+ if (csn < snapshot->global_csn)
+ return false;
+ else
+ return true;
+ }
+ else if (GlobalCSNIsFrozen(csn))
+ {
+ /* It is bootstrap or frozen transaction */
+ return false;
+ }
+ else
+ {
+ /* It is aborted or in-progress */
+ Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn));
+ if (GlobalCSNIsAborted(csn))
+ Assert(TransactionIdDidAbort(xid));
+ return true;
+ }
+}
+
+
+/*****************************************************************************
+ * Functions to handle distributed commit on transaction coordinator:
+ * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent().
+ * Correspoding functions for remote nodes are defined in twophase.c:
+ * pg_global_snapshot_prepare/pg_global_snapshot_assign.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+GlobalCSN
+GlobalSnapshotPrepareCurrent()
+{
+ TransactionId xid = GetCurrentTransactionIdIfAny();
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (TransactionIdIsValid(xid))
+ {
+ TransactionId *subxids;
+ int nsubxids = xactGetCommittedChildren(&subxids);
+ GlobalCSNLogSetCSN(xid, nsubxids,
+ subxids, InDoubtGlobalCSN);
+ }
+
+ /* Nothing to write if we don't have xid */
+
+ return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * GlobalSnapshotAssignCsnCurrent
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ */
+void
+GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn)
+{
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (!GlobalCSNIsNormal(global_csn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+ /* Skip emtpty transactions */
+ if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+ return;
+
+ /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */
+ pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn);
+}
+
+
+/*****************************************************************************
+ * Functions to handle global and local transactions commit.
+ *
+ * For local transactions GlobalSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires global_csn under ProcArray lock and stores it
+ * in proc->assignedGlobalCsn. It's important that global_csn for commit is
+ * generated under ProcArray lock, otherwise global and local snapshots won't
+ * be equivalent. Consequent call to GlobalSnapshotCommit will write
+ * proc->assignedGlobalCsn to GlobalCSNLog.
+ *
+ * Same rules applies to global transaction, except that global_csn is already
+ * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and
+ * GlobalSnapshotPrecommit is basically no-op.
+ *
+ * GlobalSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotAbort
+ *
+ * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+GlobalSnapshotAbort(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ if (!track_global_snapshots)
+ return;
+
+ GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN);
+
+ /*
+ * Clean assignedGlobalCsn anyway, as it was possibly set in
+ * GlobalSnapshotAssignCsnCurrent.
+ */
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
+
+/*
+ * GlobalSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * global csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and GlobalCSN assignment can wait
+ * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN().
+ *
+ * For global transaction this does nothing as InDoubt state was written
+ * earlier.
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN;
+ bool in_progress;
+
+ if (!track_global_snapshots)
+ return;
+
+ /* Set InDoubt status if it is local transaction */
+ in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn,
+ &oldAssignedGlobalCsn,
+ InDoubtGlobalCSN);
+ if (in_progress)
+ {
+ Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn));
+ GlobalCSNLogSetCSN(xid, nsubxids,
+ subxids, InDoubtGlobalCSN);
+ }
+ else
+ {
+ /* Otherwise we should have valid GlobalCSN by this time */
+ Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn));
+ /* Also global transaction should already be in InDoubt state */
+ Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid)));
+ }
+}
+
+/*
+ * GlobalSnapshotCommit
+ *
+ * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be
+ * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetGlobalCSN can wait on this
+ * lock for GlobalCSN.
+ */
+void
+GlobalSnapshotCommit(PGPROC *proc, TransactionId xid,
+ int nsubxids, TransactionId *subxids)
+{
+ volatile GlobalCSN assigned_global_csn;
+
+ if (!track_global_snapshots)
+ return;
+
+ if (!TransactionIdIsValid(xid))
+ {
+ assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+ Assert(GlobalCSNIsInProgress(assigned_global_csn));
+ return;
+ }
+
+ /* Finally write resulting GlobalCSN in SLRU */
+ assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+ Assert(GlobalCSNIsNormal(assigned_global_csn));
+ GlobalCSNLogSetCSN(xid, nsubxids,
+ subxids, assigned_global_csn);
+
+ /* Reset for next transaction */
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 0ecc02a3dd..c5d59fa2f2 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/global_snapshot.h"
#include "access/global_csn_log.h"
#include "access/htup_details.h"
#include "access/subtrans.h"
@@ -1477,8 +1478,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nabortrels, abortrels,
gid);
+ /*
+ * GlobalSnapshot callbacks that should be called right before we are
+ * going to become visible. Details in comments to this functions.
+ */
+ if (isCommit)
+ GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+ else
+ GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
ProcArrayRemove(proc, latestXid);
+ /*
+ * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, since TransactionIdGetGlobalCSN relies on
+ * XactLockTableWait to await global_csn.
+ */
+ if (isCommit)
+ {
+ GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+ }
+ else
+ {
+ Assert(GlobalCSNIsInProgress(
+ pg_atomic_read_u64(&proc->assignedGlobalCsn)));
+ }
+
/*
* In case we fail while running the callbacks, mark the gxact invalid so
* no one else will try to commit/rollback, and so it will be recycled if
@@ -2439,3 +2466,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
RemoveTwoPhaseFile(xid, giveWarning);
RemoveGXact(gxact);
}
+
+/*
+ * GlobalSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ *
+ * This function is a counterpart of GlobalSnapshotPrepareCurrent() for
+ * twophase transactions.
+ */
+static GlobalCSN
+GlobalSnapshotPrepareTwophase(const char *gid)
+{
+ GlobalTransaction gxact;
+ PGXACT *pgxact;
+ char *buf;
+ TransactionId xid;
+ xl_xact_parsed_prepare parsed;
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends do not
+ * try to access the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ xid = pgxact->xid;
+
+ if (gxact->ondisk)
+ buf = ReadTwoPhaseFile(xid, true);
+ else
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+ ParsePrepareRecord(0, (xl_xact_prepare *) buf, &parsed);
+
+ GlobalCSNLogSetCSN(xid, parsed.nsubxacts,
+ parsed.subxacts, InDoubtGlobalCSN);
+
+ /* Unlock our GXACT */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ gxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
+
+ pfree(buf);
+
+ return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * SQL interface to GlobalSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_global_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+ const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ GlobalCSN global_csn;
+
+ global_csn = GlobalSnapshotPrepareTwophase(gid);
+
+ PG_RETURN_INT64(global_csn);
+}
+
+
+/*
+ * TwoPhaseAssignGlobalCsn
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ *
+ * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for
+ * twophase transactions.
+ */
+static void
+GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn)
+{
+ GlobalTransaction gxact;
+ PGPROC *proc;
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not prepare transaction for global commit"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (!GlobalCSNIsNormal(global_csn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends do not
+ * try to access the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+ /* Set global_csn and defuse ProcArrayRemove from assigning one. */
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn);
+
+ /* Unlock our GXACT */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ gxact->locking_backend = InvalidBackendId;
+ LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to GlobalSnapshotAssignCsnTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn'
+ */
+Datum
+pg_global_snapshot_assign(PG_FUNCTION_ARGS)
+{
+ const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ GlobalCSN global_csn = PG_GETARG_INT64(1);
+
+ GlobalSnapshotAssignCsnTwoPhase(gid, global_csn);
+ PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3984dd3e1a..8fddb6edaf 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
#include <unistd.h>
#include "access/commit_ts.h"
+#include "access/global_snapshot.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/subtrans.h"
@@ -1433,6 +1434,14 @@ RecordTransactionCommit(void)
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
+
+ /*
+ * Mark our transaction as InDoubt in GlobalCsnLog and get ready for
+ * commit.
+ */
+ if (markXidCommitted)
+ GlobalSnapshotPrecommit(MyProc, xid, nchildren, children);
+
cleanup:
/* Clean up local data */
if (rels)
@@ -1694,6 +1703,11 @@ RecordTransactionAbort(bool isSubXact)
*/
TransactionIdAbortTree(xid, nchildren, children);
+ /*
+ * Mark our transaction as Aborted in GlobalCsnLog.
+ */
+ GlobalSnapshotAbort(MyProc, xid, nchildren, children);
+
END_CRIT_SECTION();
/* Compute latestXid while we have the child XIDs handy */
@@ -2183,6 +2197,21 @@ CommitTransaction(void)
*/
ProcArrayEndTransaction(MyProc, latestXid);
+ /*
+ * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks.
+ */
+ if (!is_parallel_worker)
+ {
+ TransactionId xid = GetTopTransactionIdIfAny();
+ TransactionId *subxids;
+ int nsubxids;
+
+ nsubxids = xactGetCommittedChildren(&subxids);
+ GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids);
+ }
+
/*
* This is all post-commit cleanup. Note that if an error is raised here,
* it's too late to abort the transaction. This should be just
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 285d9d442e..b485db5456 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7058,6 +7058,7 @@ StartupXLOG(void)
StartupCLOG();
StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
+ GlobalSnapshotStartup(oldestActiveXID);
/*
* If we're beginning at a shutdown checkpoint, we know that
@@ -7876,6 +7877,7 @@ StartupXLOG(void)
StartupCLOG();
StartupGlobalCSNLog(oldestActiveXID);
StartupSUBTRANS(oldestActiveXID);
+ GlobalSnapshotStartup(oldestActiveXID);
}
/*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index dc2d2959c4..d1819dc2c8 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/nbtree.h"
#include "access/subtrans.h"
#include "access/twophase.h"
+#include "access/global_snapshot.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -145,6 +146,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, ApplyLauncherShmemSize());
+ size = add_size(size, GlobalSnapshotShmemSize());
size = add_size(size, SnapMgrShmemSize());
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
@@ -266,6 +268,7 @@ CreateSharedMemoryAndSemaphores(void)
BTreeShmemInit();
SyncScanShmemInit();
AsyncShmemInit();
+ GlobalSnapshotShmemInit();
#ifdef EXEC_BACKEND
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8ae4906474..23db5039a4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -47,6 +47,7 @@
#include "access/clog.h"
#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
@@ -95,6 +96,8 @@ typedef struct ProcArrayStruct
TransactionId replication_slot_xmin;
/* oldest catalog xmin of any replication slot */
TransactionId replication_slot_catalog_xmin;
+ /* xmin of oldest active global snapshot */
+ TransactionId global_snapshot_xmin;
/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -250,6 +253,7 @@ CreateSharedProcArray(void)
procArray->lastOverflowedXid = InvalidTransactionId;
procArray->replication_slot_xmin = InvalidTransactionId;
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+ procArray->global_snapshot_xmin = InvalidTransactionId;
}
allProcs = ProcGlobal->allProcs;
@@ -355,6 +359,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
latestXid))
ShmemVariableCache->latestCompletedXid = latestXid;
+
+ /*
+ * Assign global csn while holding ProcArrayLock for non-global
+ * COMMIT PREPARED. After lock is released consequent
+ * GlobalSnapshotCommit() will write this value to GlobalCsnLog.
+ *
+ * In case of global commit proc->assignedGlobalCsn is already set
+ * by prior AssignGlobalCsn().
+ */
+ if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
}
else
{
@@ -435,6 +450,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
+ proc->originalXmin = InvalidTransactionId;
+
/* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
proc->delayChkpt = false; /* be sure this is cleared in abort */
@@ -457,6 +474,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
pgxact->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
+ proc->originalXmin = InvalidTransactionId;
+
/* must be cleared with xid/xmin: */
pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
proc->delayChkpt = false; /* be sure this is cleared in abort */
@@ -470,6 +489,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
latestXid))
ShmemVariableCache->latestCompletedXid = latestXid;
+
+ /*
+ * Assign global csn while holding ProcArrayLock for non-global
+ * COMMIT. After lock is released consequent GlobalSnapshotFinish() will
+ * write this value to GlobalCsnLog.
+ *
+ * In case of global commit MyProc->assignedGlobalCsn is already set
+ * by prior AssignGlobalCsn().
+ *
+ * TODO: in case of group commit we can generate one GlobalSnapshot for
+ * whole group to save time on timestamp aquisition.
+ */
+ if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+ pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
}
/*
@@ -613,6 +646,7 @@ ProcArrayClearTransaction(PGPROC *proc)
pgxact->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
pgxact->xmin = InvalidTransactionId;
+ proc->originalXmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
/* redundant, but just in case */
@@ -1315,6 +1349,7 @@ GetOldestXmin(Relation rel, int flags)
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+ TransactionId global_snapshot_xmin = InvalidTransactionId;
/*
* If we're not computing a relation specific limit, or if a shared
@@ -1351,8 +1386,9 @@ GetOldestXmin(Relation rel, int flags)
proc->databaseId == MyDatabaseId ||
proc->databaseId == 0) /* always include WalSender */
{
- /* Fetch xid just once - see GetNewTransactionId */
+ /* Fetch both xids just once - see GetNewTransactionId */
TransactionId xid = UINT32_ACCESS_ONCE(pgxact->xid);
+ TransactionId original_xmin = UINT32_ACCESS_ONCE(proc->originalXmin);
/* First consider the transaction's own Xid, if any */
if (TransactionIdIsNormal(xid) &&
@@ -1365,8 +1401,17 @@ GetOldestXmin(Relation rel, int flags)
* We must check both Xid and Xmin because a transaction might
* have an Xmin but not (yet) an Xid; conversely, if it has an
* Xid, that could determine some not-yet-set Xmin.
+ *
+ * In case of oldestXmin calculation for GlobalSnapshotMapXmin()
+ * pgxact->xmin should be changed to proc->originalXmin. Details
+ * in commets to GlobalSnapshotMapXmin.
*/
- xid = UINT32_ACCESS_ONCE(pgxact->xmin);
+ if ((flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+ TransactionIdIsValid(original_xmin))
+ xid = original_xmin;
+ else
+ xid = UINT32_ACCESS_ONCE(pgxact->xmin);
+
if (TransactionIdIsNormal(xid) &&
TransactionIdPrecedes(xid, result))
result = xid;
@@ -1380,6 +1425,7 @@ GetOldestXmin(Relation rel, int flags)
*/
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+ global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin();
if (RecoveryInProgress())
{
@@ -1421,6 +1467,11 @@ GetOldestXmin(Relation rel, int flags)
result = FirstNormalTransactionId;
}
+ if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+ TransactionIdIsValid(global_snapshot_xmin) &&
+ NormalTransactionIdPrecedes(global_snapshot_xmin, result))
+ result = global_snapshot_xmin;
+
/*
* Check whether there are replication slots requiring an older xmin.
*/
@@ -1515,8 +1566,10 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ GlobalCSN global_csn = FrozenGlobalCSN;
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+ TransactionId global_snapshot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
@@ -1708,10 +1761,18 @@ GetSnapshotData(Snapshot snapshot)
*/
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+ global_snapshot_xmin = ProcArrayGetGlobalSnapshotXmin();
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
+ /*
+ * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays
+ * synchronized.
+ */
+ if (track_global_snapshots)
+ global_csn = GlobalSnapshotGenerate(false);
+
LWLockRelease(ProcArrayLock);
/*
@@ -1727,6 +1788,10 @@ GetSnapshotData(Snapshot snapshot)
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
+ if (/*track_global_snapshots && */TransactionIdIsValid(global_snapshot_xmin) &&
+ TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = global_snapshot_xmin;
+
/* Check whether there's a replication slot requiring an older xmin. */
if (TransactionIdIsValid(replication_slot_xmin) &&
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -1782,6 +1847,11 @@ GetSnapshotData(Snapshot snapshot)
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
}
+ snapshot->imported_global_csn = false;
+ snapshot->global_csn = global_csn;
+ if (global_snapshot_defer_time > 0 && IsUnderPostmaster)
+ GlobalSnapshotMapXmin(snapshot->global_csn);
+
return snapshot;
}
@@ -3129,6 +3199,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
LWLockRelease(ProcArrayLock);
}
+/*
+ * ProcArraySetGlobalSnapshotXmin
+ */
+void
+ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
+{
+ /* We rely on atomic fetch/store of xid */
+ procArray->global_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetGlobalSnapshotXmin
+ */
+TransactionId
+ProcArrayGetGlobalSnapshotXmin(void)
+{
+ return procArray->global_snapshot_xmin;
+}
#define XidCacheRemove(i) \
do { \
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index fe18c93b61..86d0a0acae 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42
LogicalRepWorkerLock 43
CLogTruncationLock 44
GlobalCSNLogControlLock 45
+GlobalSnapshotXidMapLock 46
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 5aa19d3f78..8a47a2d375 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -37,6 +37,7 @@
#include "access/transam.h"
#include "access/twophase.h"
+#include "access/global_snapshot.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -441,6 +442,9 @@ InitProcess(void)
MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO);
+ MyProc->originalXmin = InvalidTransactionId;
+ pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN);
+
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
* on it. That allows us to repoint the process latch, which so far
@@ -584,6 +588,7 @@ InitAuxiliaryProcess(void)
MyProc->lwWaitMode = 0;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
+ MyProc->originalXmin = InvalidTransactionId;
#ifdef USE_ASSERT_CHECKING
{
int i;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0ca331c6f9..7154c3499e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
#include "access/commit_ts.h"
#include "access/gin.h"
+#include "access/global_snapshot.h"
#include "access/rmgr.h"
#include "access/tableam.h"
#include "access/transam.h"
@@ -1181,7 +1182,7 @@ static struct config_bool ConfigureNamesBool[] =
gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
},
&track_global_snapshots,
- true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+ false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */
NULL, NULL, NULL
},
{
@@ -2495,6 +2496,16 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+ NULL
+ },
+ &global_snapshot_defer_time,
+ 5, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
/*
* See also CheckRequiredParameterValues() if this parameter changes
*/
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 995b6ca155..0fd7d8501c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -306,6 +306,8 @@
# and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
+#global_snapshot_defer_time = 0 # minimal age of records which allowed to be
+ # vacuumed, in seconds
# - Standby Servers -
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c063c592c..3d925a7866 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/global_snapshot.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
@@ -247,6 +248,8 @@ typedef struct SerializedSnapshotData
CommandId curcid;
TimestampTz whenTaken;
XLogRecPtr lsn;
+ GlobalCSN global_csn;
+ bool imported_global_csn;
} SerializedSnapshotData;
Size
@@ -1024,7 +1027,9 @@ SnapshotResetXmin(void)
pairingheap_first(&RegisteredSnapshots));
if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
+ {
MyPgXact->xmin = minSnapshot->xmin;
+ }
}
/*
@@ -2115,6 +2120,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
serialized_snapshot.curcid = snapshot->curcid;
serialized_snapshot.whenTaken = snapshot->whenTaken;
serialized_snapshot.lsn = snapshot->lsn;
+ serialized_snapshot.global_csn = snapshot->global_csn;
+ serialized_snapshot.imported_global_csn = snapshot->imported_global_csn;
/*
* Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2189,6 +2196,8 @@ RestoreSnapshot(char *start_address)
snapshot->curcid = serialized_snapshot.curcid;
snapshot->whenTaken = serialized_snapshot.whenTaken;
snapshot->lsn = serialized_snapshot.lsn;
+ snapshot->global_csn = serialized_snapshot.global_csn;
+ snapshot->imported_global_csn = serialized_snapshot.imported_global_csn;
/* Copy XIDs, if present. */
if (serialized_snapshot.xcnt > 0)
@@ -2228,8 +2237,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
}
/*
- * XidInMVCCSnapshot
- * Is the given XID still-in-progress according to the snapshot?
+ * XidInLocalMVCCSnapshot
+ * Is the given XID still-in-progress according to the local snapshot?
*
* Note: GetSnapshotData never stores either top xid or subxids of our own
* backend into a snapshot, so these xids will not be reported as "running"
@@ -2237,8 +2246,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
* TransactionIdIsCurrentTransactionId first, except when it's known the
* XID could not be ours anyway.
*/
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
@@ -2348,3 +2357,153 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
return false;
}
+
+/*
+ * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot, taking into account fact that
+ * snapshot can be global. When track_global_snapshots is switched off
+ * just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ bool in_snapshot;
+
+ if (snapshot->imported_global_csn)
+ {
+ Assert(track_global_snapshots);
+ /* No point to using snapshot info except CSN */
+ return XidInvisibleInGlobalSnapshot(xid, snapshot);
+ }
+
+ in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+ if (!track_global_snapshots)
+ {
+ Assert(GlobalCSNIsFrozen(snapshot->global_csn));
+ return in_snapshot;
+ }
+
+ if (in_snapshot)
+ {
+ /*
+ * This xid may be already in unknown state and in that case
+ * we must wait and recheck.
+ *
+ * TODO: this check can be skipped if we know for sure that there were
+ * no global transactions when this snapshot was taken. That requires
+ * some changes to mechanisms of global snapshots exprot/import (if
+ * backend set xmin then we should have a-priori knowledge that this
+ * transaction going to be global or local -- right now this is not
+ * enforced). Leave that for future and don't complicate this patch.
+ */
+ return XidInvisibleInGlobalSnapshot(xid, snapshot);
+ }
+ else
+ {
+#ifdef USE_ASSERT_CHECKING
+ /* Check that global snapshot gives the same results as local one */
+ if (XidInvisibleInGlobalSnapshot(xid, snapshot))
+ {
+ GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid);
+ Assert(GlobalCSNIsAborted(gcsn));
+ }
+#endif
+ return false;
+ }
+}
+
+/*
+ * ExportGlobalSnapshot
+ *
+ * Export global_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+GlobalCSN
+ExportGlobalSnapshot()
+{
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not export global snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ return CurrentSnapshot->global_csn;
+}
+
+/* SQL accessor to ExportGlobalSnapshot() */
+Datum
+pg_global_snapshot_export(PG_FUNCTION_ARGS)
+{
+ GlobalCSN global_csn = ExportGlobalSnapshot();
+ PG_RETURN_UINT64(global_csn);
+}
+
+/*
+ * ImportGlobalSnapshot
+ *
+ * Import global_csn and retract this backends xmin to the value that was
+ * actual when we had such global_csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportGlobalSnapshot(GlobalCSN snap_global_csn)
+{
+ volatile TransactionId xmin;
+
+ if (!track_global_snapshots)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import global snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is enabled.",
+ "track_global_snapshots")));
+
+ if (global_snapshot_defer_time <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import global snapshot"),
+ errhint("Make sure the configuration parameter \"%s\" is positive.",
+ "global_snapshot_defer_time")));
+
+ /*
+ * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that
+ * resulting xmin will be evicted from map before we will set it into our
+ * backend's xmin.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ xmin = GlobalSnapshotToXmin(snap_global_csn);
+ if (!TransactionIdIsValid(xmin))
+ {
+ LWLockRelease(ProcArrayLock);
+ elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old");
+ }
+ MyProc->originalXmin = MyPgXact->xmin;
+ MyPgXact->xmin = TransactionXmin = xmin;
+ LWLockRelease(ProcArrayLock);
+
+ CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+ CurrentSnapshot->global_csn = snap_global_csn;
+ CurrentSnapshot->imported_global_csn = true;
+ GlobalSnapshotSync(snap_global_csn);
+
+ Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin));
+ Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin));
+}
+
+/* SQL accessor to ImportGlobalSnapshot() */
+Datum
+pg_global_snapshot_import(PG_FUNCTION_ARGS)
+{
+ GlobalCSN global_csn = PG_GETARG_UINT64(0);
+ ImportGlobalSnapshot(global_csn);
+ PG_RETURN_VOID();
+}
diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h
new file mode 100644
index 0000000000..246b180cfd
--- /dev/null
+++ b/src/include/access/global_snapshot.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.h
+ * Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef GLOBAL_SNAPSHOT_H
+#define GLOBAL_SNAPSHOT_H
+
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of GlobalCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 GlobalCSN_atomic;
+
+#define InProgressGlobalCSN UINT64CONST(0x0)
+#define AbortedGlobalCSN UINT64CONST(0x1)
+#define FrozenGlobalCSN UINT64CONST(0x2)
+#define InDoubtGlobalCSN UINT64CONST(0x3)
+#define FirstNormalGlobalCSN UINT64CONST(0x4)
+
+#define GlobalCSNIsInProgress(csn) ((csn) == InProgressGlobalCSN)
+#define GlobalCSNIsAborted(csn) ((csn) == AbortedGlobalCSN)
+#define GlobalCSNIsFrozen(csn) ((csn) == FrozenGlobalCSN)
+#define GlobalCSNIsInDoubt(csn) ((csn) == InDoubtGlobalCSN)
+#define GlobalCSNIsNormal(csn) ((csn) >= FirstNormalGlobalCSN)
+
+
+extern int global_snapshot_defer_time;
+
+
+extern Size GlobalSnapshotShmemSize(void);
+extern void GlobalSnapshotShmemInit(void);
+extern void GlobalSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn);
+extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn);
+
+extern GlobalCSN GlobalSnapshotGenerate(bool locked);
+
+extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern void GlobalSnapshotSync(GlobalCSN remote_gcsn);
+
+extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid);
+
+extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid);
+extern void GlobalSnapshotAssignCsnGlobal(const char *gid,
+ GlobalCSN global_csn);
+
+extern GlobalCSN GlobalSnapshotPrepareCurrent(void);
+extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn);
+
+extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+
+#endif /* GLOBAL_SNAPSHOT_H */
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 2ca71c3445..b4899f3754 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
#include "access/xlogdefs.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
+#include "utils/snapshot.h"
/*
* GlobalTransactionData is defined in twophase.c; other places have no
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a85c78e796..64c3c71df3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10945,4 +10945,18 @@
proname => 'is_normalized', prorettype => 'bool',
proargtypes => 'text text', prosrc => 'unicode_is_normalized' },
+# global transaction handling
+{ oid => '4388', descr => 'export global transaction snapshot',
+ proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' },
+{ oid => '4389', descr => 'import global transaction snapshot',
+ proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' },
+{ oid => '4390', descr => 'prepare distributed transaction for commit, get global_csn',
+ proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+ prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' },
+{ oid => '4391', descr => 'assign global_csn to distributed transaction',
+ proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' },
+
]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 6be6d35d1e..583b1beea5 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
#define USECS_PER_MINUTE INT64CONST(60000000)
#define USECS_PER_SEC INT64CONST(1000000)
+#define NSECS_PER_SEC INT64CONST(1000000000)
+#define NSECS_PER_USEC INT64CONST(1000)
+
/*
* We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
* Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index d349510b7c..5cdf2e17cb 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -280,6 +280,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
#define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n))
#define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n))
#define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n))
/* use this if you want the raw, possibly-toasted input datum: */
#define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n))
/* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index d6459327cc..4ac23da654 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -141,6 +141,9 @@ typedef struct timespec instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
+#define INSTR_TIME_GET_NANOSEC(t) \
+ (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
#else /* !HAVE_CLOCK_GETTIME */
/* Use gettimeofday() */
@@ -205,6 +208,10 @@ typedef struct timeval instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
+#define INSTR_TIME_GET_NANOSEC(t) \
+ (((uint64) (t).tv_sec * (uint64) 1000000000) + \
+ (uint64) (t).tv_usec * (uint64) 1000)
+
#endif /* HAVE_CLOCK_GETTIME */
#else /* WIN32 */
@@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time;
#define INSTR_TIME_GET_MICROSEC(t) \
((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
+#define INSTR_TIME_GET_NANOSEC(t) \
+ ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
static inline double
GetTimerFrequency(void)
{
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index ae4f573ab4..da84dbf04c 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,8 +15,10 @@
#define _PROC_H_
#include "access/clog.h"
+#include "access/global_snapshot.h"
#include "access/xlogdefs.h"
#include "lib/ilist.h"
+#include "utils/snapshot.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
@@ -57,6 +59,7 @@ struct XidCache
#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical
* decoding outside xact */
#define PROC_RESERVED 0x20 /* reserved for procarray */
+#define PROC_RESERVED2 0x40 /* reserved for procarray */
/* flags reset at EOXact */
#define PROC_VACUUM_STATE_MASK \
@@ -205,6 +208,18 @@ struct PGPROC
PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */
dlist_head lockGroupMembers; /* list of members, if I'm a leader */
dlist_node lockGroupLink; /* my member link, if I'm a member */
+
+ /*
+ * assignedGlobalCsn holds GlobalCSN for this transaction. It is generated
+ * under a ProcArray lock and later is writter to a GlobalCSNLog. This
+ * variable defined as atomic only for case of group commit, in all other
+ * scenarios only backend responsible for this proc entry is working with
+ * this variable.
+ */
+ GlobalCSN_atomic assignedGlobalCsn;
+
+ /* Original xmin of this backend before global snapshot was imported */
+ TransactionId originalXmin;
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index a5c7d0c064..452ae5d547 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -36,6 +36,10 @@
#define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin,
* catalog_xmin */
+
+#define PROCARRAY_NON_IMPORTED_XMIN 0x40 /* use originalXmin instead
+ * of xmin to properly
+ * maintain gsXidMap */
/*
* Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
* PGXACT->vacuumFlags. Other flags are used for different purposes and
@@ -125,4 +129,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
TransactionId *catalog_xmin);
+extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetGlobalSnapshotXmin(void);
+
#endif /* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index b28d13ce84..f4768bc6d4 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -127,6 +127,9 @@ extern void AtSubCommit_Snapshot(int level);
extern void AtSubAbort_Snapshot(int level);
extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
+extern GlobalCSN ExportGlobalSnapshot(void);
+extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn);
+
extern void ImportSnapshot(const char *idstr);
extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 57d2dfaa67..71c92c69f4 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -204,6 +204,14 @@ typedef struct SnapshotData
TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
+
+ /*
+ * GlobalCSN for cross-node snapshot isolation support.
+ * Will be used only if track_global_snapshots is enabled.
+ */
+ GlobalCSN global_csn;
+ /* Did we have our own global_csn or imported one from different node */
+ bool imported_global_csn;
} SnapshotData;
#endif /* SNAPSHOT_H */
--
2.17.1
>From d3b8cb68ca1bb8721f738c4993083ea4cca3d255 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <[email protected]>
Date: Tue, 12 May 2020 08:31:59 +0500
Subject: [PATCH 3/3] postgres_fdw-support-for-global-snapshots-v3
---
contrib/postgres_fdw/Makefile | 9 +
contrib/postgres_fdw/connection.c | 290 ++++++++++++++++--
contrib/postgres_fdw/postgres_fdw.c | 12 +
contrib/postgres_fdw/postgres_fdw.h | 2 +
.../postgres_fdw/t/001_bank_coordinator.pl | 264 ++++++++++++++++
.../postgres_fdw/t/002_bank_participant.pl | 240 +++++++++++++++
src/test/perl/PostgresNode.pm | 35 +++
7 files changed, 826 insertions(+), 26 deletions(-)
create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl
create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl
diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index ee8a80a392..07091f630e 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -29,3 +29,12 @@ top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
+
+# Global makefile will do temp-install for 'check'. Since REGRESS is defined,
+# PGXS (included from contrib-global.mk or directly) will care to add
+# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also
+# actually run pg_regress, so the only thing left is tap tests.
+check: tapcheck
+
+tapcheck: temp-install
+ $(prove_check)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index e45647f3ea..8e33ae0af7 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -12,8 +12,10 @@
*/
#include "postgres.h"
+#include "access/global_snapshot.h"
#include "access/htup_details.h"
#include "access/xact.h"
+#include "access/xlog.h" /* GetSystemIdentifier() */
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "mb/pg_wchar.h"
@@ -25,6 +27,8 @@
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+#include "utils/snapshot.h"
#include "utils/syscache.h"
/*
@@ -65,6 +69,21 @@ typedef struct ConnCacheEntry
*/
static HTAB *ConnectionHash = NULL;
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+ char *gid;
+ int nparticipants;
+ GlobalCSN global_csn;
+ bool two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
/* for assigning cursor numbers and prepared statement numbers */
static unsigned int cursor_number = 0;
static unsigned int prep_stmt_number = 0;
@@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
/* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn);
static void do_sql_command(PGconn *conn, const char *sql);
static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
@@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
pgfdw_inval_callback, (Datum) 0);
}
+ /* allocate FdwTransactionState */
+ if (fdwTransState == NULL)
+ {
+ MemoryContext oldcxt;
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ fdwTransState = palloc0(sizeof(FdwTransactionState));
+ MemoryContextSwitchTo(oldcxt);
+ }
+
/* Set flag that we did GetConnection during the current transaction */
xact_got_connection = true;
@@ -446,7 +478,8 @@ configure_remote_session(PGconn *conn)
}
/*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
*/
static void
do_sql_command(PGconn *conn, const char *sql)
@@ -456,7 +489,8 @@ do_sql_command(PGconn *conn, const char *sql)
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
}
@@ -484,6 +518,10 @@ begin_remote_xact(ConnCacheEntry *entry)
elog(DEBUG3, "starting remote transaction on connection %p",
entry->conn);
+ if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() ||
+ IsolationIsSerializable()))
+ elog(ERROR, "Global snapshots support only REPEATABLE READ");
+
if (IsolationIsSerializable())
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
else
@@ -492,6 +530,23 @@ begin_remote_xact(ConnCacheEntry *entry)
do_sql_command(entry->conn, sql);
entry->xact_depth = 1;
entry->changing_xact_state = false;
+
+ if (UseGlobalSnapshots)
+ {
+ char import_sql[128];
+
+ /* Export our snapshot */
+ if (fdwTransState->global_csn == 0)
+ fdwTransState->global_csn = ExportGlobalSnapshot();
+
+ snprintf(import_sql, sizeof(import_sql),
+ "SELECT pg_global_snapshot_import("UINT64_FORMAT")",
+ fdwTransState->global_csn);
+
+ do_sql_command(entry->conn, import_sql);
+ }
+
+ fdwTransState->nparticipants += 1;
}
/*
@@ -699,6 +754,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
PG_END_TRY();
}
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/* Broadcast sql in parallel to all ConnectionHash entries */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+ BroadcastCmdResHandler handler, void *arg)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+ bool allOk = true;
+
+ /* Broadcast sql */
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ if (entry->xact_depth > 0 && entry->conn != NULL)
+ {
+ if (!PQsendQuery(entry->conn, sql))
+ {
+ PGresult *res = PQgetResult(entry->conn);
+
+ elog(WARNING, "Failed to send command %s", sql);
+ pgfdw_report_error(WARNING, res, entry->conn, true, sql);
+ PQclear(res);
+ }
+ }
+ }
+
+ /* Collect responses */
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->xact_depth > 0 && entry->conn != NULL)
+ {
+ PGresult *result = PQgetResult(entry->conn);
+
+ if (PQresultStatus(result) != expectedStatus ||
+ (handler && !handler(result, arg)))
+ {
+ elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+ pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+ allOk = false;
+ }
+ PQclear(result);
+ PQgetResult(entry->conn); /* consume NULL result */
+ }
+ }
+
+ return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql)
+{
+ return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL);
+}
+
+/* Wrapper for broadcasting statements */
+static bool
+BroadcastFunc(char const *sql)
+{
+ return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL);
+}
+
+/* Callback for selecting maximal csn */
+static bool
+MaxCsnCB(PGresult *result, void *arg)
+{
+ char *resp;
+ GlobalCSN *max_csn = (GlobalCSN *) arg;
+ GlobalCSN csn = 0;
+
+ resp = PQgetvalue(result, 0, 0);
+
+ if (resp == NULL || (*resp) == '\0' ||
+ sscanf(resp, UINT64_FORMAT, &csn) != 1)
+ return false;
+
+ if (*max_csn < csn)
+ *max_csn = csn;
+
+ return true;
+}
+
/*
* pgfdw_xact_callback --- cleanup at main-transaction end.
*/
@@ -712,6 +855,86 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (!xact_got_connection)
return;
+ /* Handle possible two-phase commit */
+ if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+ {
+ bool include_local_tx = false;
+
+ /* Should we take into account this node? */
+ if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+ {
+ include_local_tx = true;
+ fdwTransState->nparticipants += 1;
+ }
+
+ /* Switch to 2PC mode there were more than one participant */
+ if (UseGlobalSnapshots && fdwTransState->nparticipants > 1)
+ fdwTransState->two_phase_commit = true;
+
+ if (fdwTransState->two_phase_commit)
+ {
+ GlobalCSN max_csn = InProgressGlobalCSN,
+ my_csn = InProgressGlobalCSN;
+ bool res;
+ char *sql;
+
+ fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+ (long long) GetCurrentTimestamp(),
+ (long long) GetSystemIdentifier(),
+ MyProcPid,
+ GetCurrentTransactionIdIfAny(),
+ ++two_phase_xact_count,
+ fdwTransState->nparticipants);
+
+ /* Broadcast PREPARE */
+ sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+ res = BroadcastCmd(sql);
+ if (!res)
+ goto error;
+
+ /* Broadcast pg_global_snapshot_prepare() */
+ if (include_local_tx)
+ my_csn = GlobalSnapshotPrepareCurrent();
+
+ sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
+ fdwTransState->gid);
+ res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
+ if (!res)
+ goto error;
+
+ /* select maximal global csn */
+ if (include_local_tx && my_csn > max_csn)
+ max_csn = my_csn;
+
+ /* Broadcast pg_global_snapshot_assign() */
+ if (include_local_tx)
+ GlobalSnapshotAssignCsnCurrent(max_csn);
+ sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
+ fdwTransState->gid, max_csn);
+ res = BroadcastFunc(sql);
+
+error:
+ if (!res)
+ {
+ sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
+ BroadcastCmd(sql);
+ elog(ERROR, "Failed to PREPARE transaction on remote node");
+ }
+
+ /*
+ * Do not fall down. Consequent COMMIT event will clean thing up.
+ */
+ return;
+ }
+ }
+
+ /* COMMIT open transaction of we were doing 2PC */
+ if (fdwTransState->two_phase_commit &&
+ (event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+ {
+ BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid));
+ }
+
/*
* Scan all connection cache entries to find open remote transactions, and
* close them.
@@ -719,8 +942,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- PGresult *res;
-
/* Ignore cache entry if no open connection right now */
if (entry->conn == NULL)
continue;
@@ -737,6 +958,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
+ Assert(!fdwTransState->two_phase_commit);
/*
* If abort cleanup previously failed for this connection,
@@ -749,28 +971,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
do_sql_command(entry->conn, "COMMIT TRANSACTION");
entry->changing_xact_state = false;
- /*
- * If there were any errors in subtransactions, and we
- * made prepared statements, do a DEALLOCATE ALL to make
- * sure we get rid of all prepared statements. This is
- * annoying and not terribly bulletproof, but it's
- * probably not worth trying harder.
- *
- * DEALLOCATE ALL only exists in 8.3 and later, so this
- * constrains how old a server postgres_fdw can
- * communicate with. We intentionally ignore errors in
- * the DEALLOCATE, so that we can hobble along to some
- * extent with older servers (leaking prepared statements
- * as we go; but we don't really support update operations
- * pre-8.3 anyway).
- */
- if (entry->have_prep_stmt && entry->have_error)
- {
- res = PQexec(entry->conn, "DEALLOCATE ALL");
- PQclear(res);
- }
- entry->have_prep_stmt = false;
- entry->have_error = false;
+ deallocate_prepared_stmts(entry);
break;
case XACT_EVENT_PRE_PREPARE:
@@ -789,6 +990,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
break;
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_COMMIT:
+ if (fdwTransState->two_phase_commit)
+ deallocate_prepared_stmts(entry);
+ else /* Pre-commit should have closed the open transaction */
+ elog(ERROR, "missed cleaning up connection during pre-commit");
+ break;
case XACT_EVENT_PREPARE:
/* Pre-commit should have closed the open transaction */
elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -884,6 +1090,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Also reset cursor numbering for next transaction */
cursor_number = 0;
+
+ /* Reset fdwTransState */
+ memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with. We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+ PGresult *res;
+
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..03c5b0093a 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -301,6 +301,9 @@ typedef struct
List *already_used; /* expressions already dealt with */
} ec_member_foreign_arg;
+bool UseGlobalSnapshots;
+void _PG_init(void);
+
/*
* SQL functions
*/
@@ -6584,3 +6587,12 @@ find_em_expr_for_input_target(PlannerInfo *root,
elog(ERROR, "could not find pathkey item to sort");
return NULL; /* keep compiler quiet */
}
+
+void
+_PG_init(void)
+{
+ DefineCustomBoolVariable("postgres_fdw.use_global_snapshots",
+ "Use global snapshots for FDW transactions", NULL,
+ &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL,
+ NULL, NULL);
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..9d3ea077a1 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -208,4 +208,6 @@ extern const char *get_jointype_name(JoinType jointype);
extern bool is_builtin(Oid objectId);
extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
+extern bool UseGlobalSnapshots;
+
#endif /* POSTGRES_FDW_H */
diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl
new file mode 100644
index 0000000000..1e31f33349
--- /dev/null
+++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl
@@ -0,0 +1,264 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $master = get_new_node("master");
+$master->init;
+$master->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ log_checkpoints = true
+ postgres_fdw.use_global_snapshots = on
+ track_global_snapshots = on
+ default_transaction_isolation = 'REPEATABLE READ'
+));
+$master->start;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+$master->safe_psql('postgres', qq[
+ CREATE EXTENSION postgres_fdw;
+ CREATE TABLE accounts(id integer primary key, amount integer);
+ CREATE TABLE global_transactions(tx_time timestamp);
+]);
+
+foreach my $node ($shard1, $shard2)
+{
+ my $port = $node->port;
+ my $host = $node->host;
+
+ $node->safe_psql('postgres',
+ "CREATE TABLE accounts(id integer primary key, amount integer)");
+
+ $master->safe_psql('postgres', qq[
+ CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port');
+ CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts');
+ CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+ ])
+}
+
+$shard1->safe_psql('postgres', qq[
+ insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id;
+ CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+$shard2->safe_psql('postgres', qq[
+ insert into accounts select 2*id, 0 from generate_series(1, 10010) as id;
+ CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+diag("master: @{[$master->connstr('postgres')]}");
+diag("shard1: @{[$shard1->connstr('postgres')]}");
+diag("shard2: @{[$shard2->connstr('postgres')]}");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+ \set id random(1, 20000)
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+ INSERT into global_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+ COMMIT;
+});
+
+my $bank1 = File::Temp->new();
+append_to_file($bank1, q{
+ \set id random(1, 10000)
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *)
+ INSERT into local_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3);
+ COMMIT;
+});
+
+my $bank2 = File::Temp->new();
+append_to_file($bank2, q{
+ \set id random(1, 10000)
+
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *)
+ INSERT into local_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2);
+ COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+ my ($node, $table) = @_;
+ my $count;
+
+ $count = $node->safe_psql('postgres',"select count(*) from $table");
+ $node->safe_psql('postgres',"delete from $table");
+ diag($node->name, ": completed $count transactions");
+ return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+
+
+my $pgb_handle;
+
+$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+while (time() - $started < $seconds)
+{
+ $total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+}
+
+$master->pgbench_await($pgb_handle);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($master, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# Concurrent global and local transactions
+###############################################################################
+
+my ($pgb_handle1, $pgb_handle2, $pgb_handle3);
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$started = time();
+$selects = 0;
+$oldtotal = 0;
+while (time() - $started < $seconds)
+{
+ $total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+}
+
+diag("selects = $selects");
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+diag("completed $selects selects");
+die "" unless ( $selects > 0 &&
+ count_and_delete_rows($master, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global and local transactions');
+
+
+###############################################################################
+# Snapshot stability
+###############################################################################
+
+my ($hashes, $hash1, $hash2);
+my $stability_errors = 0;
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$selects = 0;
+$started = time();
+while (time() - $started < $seconds)
+{
+ foreach my $node ($master, $shard1, $shard2)
+ {
+ ($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[
+ begin isolation level repeatable read;
+ select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+ select pg_sleep(3);
+ select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+ commit;
+ ]);
+
+ if ($hash1 ne $hash2)
+ {
+ diag("oops");
+ $stability_errors++;
+ }
+ elsif ($hash1 eq '' or $hash2 eq '')
+ {
+ die;
+ }
+ else
+ {
+ $selects++;
+ }
+ }
+}
+
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+die "" unless ( $selects > 0 &&
+ count_and_delete_rows($master, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions');
+
+$master->stop;
+$shard1->stop;
+$shard2->stop;
diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl
new file mode 100644
index 0000000000..04a2f1ba85
--- /dev/null
+++ b/contrib/postgres_fdw/t/002_bank_participant.pl
@@ -0,0 +1,240 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ postgres_fdw.use_global_snapshots = on
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+ default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+ max_prepared_transactions = 30
+ postgres_fdw.use_global_snapshots = on
+ global_snapshot_defer_time = 15
+ track_global_snapshots = on
+ default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+my @shards = ($shard1, $shard2);
+
+foreach my $node (@shards)
+{
+ $node->safe_psql('postgres', qq[
+ CREATE EXTENSION postgres_fdw;
+ CREATE TABLE accounts(id integer primary key, amount integer);
+ CREATE TABLE accounts_local() inherits(accounts);
+ CREATE TABLE global_transactions(tx_time timestamp);
+ CREATE TABLE local_transactions(tx_time timestamp);
+ ]);
+
+ foreach my $neighbor (@shards)
+ {
+ next if ($neighbor eq $node);
+
+ my $port = $neighbor->port;
+ my $host = $neighbor->host;
+
+ $node->safe_psql('postgres', qq[
+ CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw
+ options(dbname 'postgres', host '$host', port '$port');
+ CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts)
+ server shard_$port options(table_name 'accounts_local');
+ CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+ ]);
+ }
+}
+
+$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
+$shard2->psql('postgres', "insert into accounts_local select 2*id, 0 from generate_series(1, 10010) as id;");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+ \set id random(1, 20000)
+ BEGIN;
+ WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+ INSERT into global_transactions SELECT now() FROM upd;
+ UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+ COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+ my ($node, $table) = @_;
+ my $count;
+
+ $count = $node->safe_psql('postgres',"select count(*) from $table");
+ $node->safe_psql('postgres',"delete from $table");
+ diag($node->name, ": completed $count transactions");
+ return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+my $i;
+
+
+my ($pgb_handle1, $pgb_handle2);
+
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+while (time() - $started < $seconds)
+{
+ my $shard = $shard1;
+ foreach my $shard (@shards)
+ {
+ $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("$i: Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+ }
+ $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# And do the same after soft restart
+###############################################################################
+
+$shard1->restart;
+$shard2->restart;
+$shard1->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard2 to became online";
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+ my $shard = $shard1;
+ foreach my $shard (@shards)
+ {
+ $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("$i: Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+ }
+ $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after restart');
+
+###############################################################################
+# And do the same after hard restart
+###############################################################################
+
+$shard1->teardown_node;
+$shard2->teardown_node;
+$shard1->start;
+$shard2->start;
+$shard1->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+ or die "Timed out waiting for shard2 to became online";
+
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+ my $shard = $shard1;
+ foreach my $shard (@shards)
+ {
+ $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+ if ( ($total ne $oldtotal) and ($total ne '') )
+ {
+ $isolation_errors++;
+ $oldtotal = $total;
+ diag("$i: Isolation error. Total = $total");
+ }
+ if ($total ne '') { $selects++; }
+ }
+ $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+ count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+ count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart');
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 1d5450758e..ef4472170c 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2115,6 +2115,41 @@ sub pg_recvlogical_upto
}
}
+sub pgbench()
+{
+ my ($self, $node, @args) = @_;
+ my $pgbench_handle = $self->pgbench_async($node, @args);
+ $self->pgbench_await($pgbench_handle);
+}
+
+sub pgbench_async()
+{
+ my ($self, @args) = @_;
+
+ my ($in, $out, $err, $rc);
+ $in = '';
+ $out = '';
+
+ my @pgbench_command = (
+ 'pgbench',
+ -h => $self->host,
+ -p => $self->port,
+ @args
+ );
+ my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
+ return $handle;
+}
+
+sub pgbench_await()
+{
+ my ($self, $pgbench_handle) = @_;
+
+ # During run some pgbench threads can exit (for example due to
+ # serialization error). That will set non-zero returning code.
+ # So don't check return code here and leave it to a caller.
+ my $rc = IPC::Run::finish($pgbench_handle);
+}
+
=pod
=back
--
2.17.1