Patch in the previous letter is full of faulties. Please, use new version.
Also, here we fixed the problem with loosing CSN value in a parallel worker (TAP test 003_parallel_safe.pl). Thanks for a.pyhalov for the problem detection and a bugfix.

--
regards,
Andrey Lepikhov
Postgres Professional
>From 7aa57724fc42b8ca7054f9b6edfa33c0cffb24bf Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Wed, 17 Nov 2021 11:13:37 +0500
Subject: [PATCH] Add Commit Sequence Number (CSN) machinery into MVCC
 implementation for a timestamp-based resolving of visibility conflicts.

It allows to achieve proper snapshot isolation semantics in the case
of distributed transactions involving more than one Postgres instance.

Authors: K.Knizhnik, S.Kelvich, A.Sher, A.Lepikhov, M.Usama.

Discussion:
(2020/05/21 -)
https://www.postgresql.org/message-id/flat/CA%2Bfd4k6HE8xLGEvqWzABEg8kkju5MxU%2Bif7bf-md0_2pjzXp9Q%40mail.gmail.com#ed1359340871688bed2e643921f73365
(2018/05/01 - 2019/04/21)
https://www.postgresql.org/message-id/flat/21BC916B-80A1-43BF-8650-3363CCDAE09C%40postgrespro.ru
---
 doc/src/sgml/config.sgml                      |  50 +-
 src/backend/access/rmgrdesc/Makefile          |   1 +
 src/backend/access/rmgrdesc/csnlogdesc.c      |  95 +++
 src/backend/access/rmgrdesc/xlogdesc.c        |   6 +-
 src/backend/access/transam/Makefile           |   2 +
 src/backend/access/transam/csn_log.c          | 748 ++++++++++++++++++
 src/backend/access/transam/csn_snapshot.c     | 687 ++++++++++++++++
 src/backend/access/transam/rmgr.c             |   1 +
 src/backend/access/transam/twophase.c         | 154 ++++
 src/backend/access/transam/varsup.c           |   2 +
 src/backend/access/transam/xact.c             |  32 +
 src/backend/access/transam/xlog.c             |  23 +-
 src/backend/access/transam/xloginsert.c       |   2 +
 src/backend/commands/vacuum.c                 |   3 +-
 src/backend/replication/logical/snapbuild.c   |   4 +
 src/backend/storage/ipc/ipci.c                |   6 +
 src/backend/storage/ipc/procarray.c           |  85 ++
 src/backend/storage/lmgr/lwlock.c             |   2 +
 src/backend/storage/lmgr/lwlocknames.txt      |   2 +
 src/backend/storage/lmgr/proc.c               |   6 +
 src/backend/storage/sync/sync.c               |   5 +
 src/backend/utils/misc/guc.c                  |  37 +
 src/backend/utils/probes.d                    |   2 +
 src/backend/utils/time/snapmgr.c              | 183 ++++-
 src/bin/initdb/initdb.c                       |   3 +-
 src/bin/pg_controldata/pg_controldata.c       |   2 +
 src/bin/pg_upgrade/pg_upgrade.c               |   5 +
 src/bin/pg_upgrade/pg_upgrade.h               |   2 +
 src/bin/pg_waldump/rmgrdesc.c                 |   1 +
 src/include/access/csn_log.h                  |  98 +++
 src/include/access/csn_snapshot.h             |  54 ++
 src/include/access/rmgrlist.h                 |   1 +
 src/include/access/xlog_internal.h            |   2 +
 src/include/catalog/pg_control.h              |   1 +
 src/include/catalog/pg_proc.dat               |  17 +
 src/include/datatype/timestamp.h              |   3 +
 src/include/fmgr.h                            |   1 +
 src/include/portability/instr_time.h          |  10 +
 src/include/storage/lwlock.h                  |   1 +
 src/include/storage/proc.h                    |  14 +
 src/include/storage/procarray.h               |   7 +
 src/include/storage/sync.h                    |   1 +
 src/include/utils/snapmgr.h                   |   7 +-
 src/include/utils/snapshot.h                  |  11 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/csnsnapshot/Makefile         |  22 +
 .../csnsnapshot/expected/csnsnapshot.out      |   1 +
 src/test/modules/csnsnapshot/t/001_base.pl    | 100 +++
 src/test/modules/csnsnapshot/t/002_standby.pl |  68 ++
 .../csnsnapshot/t/003_parallel_safe.pl        |  67 ++
 src/test/modules/snapshot_too_old/sto.conf    |   1 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  28 +
 src/test/regress/expected/sysviews.out        |   4 +-
 53 files changed, 2660 insertions(+), 11 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/csnlogdesc.c
 create mode 100644 src/backend/access/transam/csn_log.c
 create mode 100644 src/backend/access/transam/csn_snapshot.c
 create mode 100644 src/include/access/csn_log.h
 create mode 100644 src/include/access/csn_snapshot.h
 create mode 100644 src/test/modules/csnsnapshot/Makefile
 create mode 100644 src/test/modules/csnsnapshot/expected/csnsnapshot.out
 create mode 100644 src/test/modules/csnsnapshot/t/001_base.pl
 create mode 100644 src/test/modules/csnsnapshot/t/002_standby.pl
 create mode 100644 src/test/modules/csnsnapshot/t/003_parallel_safe.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3f806740d5..f4f6c83fd0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -9682,8 +9682,56 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
      </varlistentry>
 
      </variablelist>
-   </sect1>
 
+    <sect2 id="runtime-config-CSN-based-snapshot">
+     <title>CSN Based Snapshot</title>
+      <para>
+       By default, snapshots in <productname>PostgreSQL</productname> contains a
+       XID (TransactionID) that allows to identify the status of a transaction
+       and make arbitrary visibility calculations.
+      </para>
+
+      <para>
+       <productname>PostgreSQL</productname> also provides a CSN (Commit
+        Sequence Number) based machinery as an additional tool for visibility
+        calculations. It may be used within distributed transactions when a xid of
+        a local transaction can't correctly identify order of the distributed one.
+      </para>
+
+     <variablelist>
+      <varlistentry id="guc-enable-csn-snapshot" xreflabel="enable_csn_snapshot">
+       <term><varname>enable_csn_snapshot</varname> (<type>boolean</type>)
+        <indexterm>
+         <primary><varname>enable_csn_snapshot</varname> configuration parameter</primary>
+        </indexterm>
+       </term>
+       <listitem>
+
+        <para>
+         Enable/disable the CSN tracking for the snapshot.
+        </para>
+
+        <para>
+        <productname>PostgreSQL</productname> uses a physical clock timestamp as
+        a CSN, so enabling the CSN based snapshots can be useful for implementing
+        cross-instance snapshots and visibility of distributed transaction.
+        </para>
+
+        <para>
+         when enabled <productname>PostgreSQL</productname> creates
+         <filename>pg_csn</filename> directory under <envar>PGDATA</envar> to keep
+         the track of CSN and XID mappings.
+        </para>
+
+        <para>
+         The default value is on.
+        </para>
+       </listitem>
+      </varlistentry>
+
+     </variablelist>
+    </sect2>
+   </sect1>
    <sect1 id="runtime-config-compatible">
     <title>Version and Platform Compatibility</title>
 
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..15fc36f7b4 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	brindesc.o \
 	clogdesc.o \
+	csnlogdesc.o \
 	committsdesc.o \
 	dbasedesc.o \
 	genericdesc.o \
diff --git a/src/backend/access/rmgrdesc/csnlogdesc.c b/src/backend/access/rmgrdesc/csnlogdesc.c
new file mode 100644
index 0000000000..f8c644e906
--- /dev/null
+++ b/src/backend/access/rmgrdesc/csnlogdesc.c
@@ -0,0 +1,95 @@
+/*-------------------------------------------------------------------------
+ *
+ * clogdesc.c
+ *	  rmgr descriptor routines for access/transam/csn_log.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/csnlogdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_log.h"
+
+
+void
+csnlog_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_CSN_ZEROPAGE)
+	{
+		int pageno;
+
+		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		appendStringInfo(buf, "pageno %d", pageno);
+	}
+	else if (info == XLOG_CSN_TRUNCATE)
+	{
+		int pageno;
+
+		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		appendStringInfo(buf, "pageno %d", pageno);
+	}
+	else if (info == XLOG_CSN_ASSIGNMENT)
+	{
+		CSN csn;
+
+		memcpy(&csn, XLogRecGetData(record), sizeof(CSN));
+		appendStringInfo(buf, "assign "INT64_FORMAT"", csn);
+	}
+	else if (info == XLOG_CSN_SETCSN)
+	{
+		xl_csn_set *xlrec = (xl_csn_set *) rec;
+		int			  nsubxids;
+
+		appendStringInfo(buf, "set "INT64_FORMAT" for: %u",
+						 xlrec->csn,
+						 xlrec->xtop);
+		nsubxids = ((XLogRecGetDataLen(record) - MinSizeOfCSNSet) /
+					sizeof(TransactionId));
+		if (nsubxids > 0)
+		{
+			int			i;
+			TransactionId *subxids;
+
+			subxids = palloc(sizeof(TransactionId) * nsubxids);
+			memcpy(subxids,
+				   XLogRecGetData(record) + MinSizeOfCSNSet,
+				   sizeof(TransactionId) * nsubxids);
+			for (i = 0; i < nsubxids; i++)
+				appendStringInfo(buf, ", %u", subxids[i]);
+			pfree(subxids);
+		}
+	}
+}
+
+const char *
+csnlog_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_CSN_ASSIGNMENT:
+			id = "ASSIGNMENT";
+			break;
+		case XLOG_CSN_SETCSN:
+			id = "SETCSN";
+			break;
+		case XLOG_CSN_ZEROPAGE:
+			id = "ZEROPAGE";
+			break;
+		case XLOG_CSN_TRUNCATE:
+			id = "TRUNCATE";
+			break;
+	}
+
+	return id;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 5bf2346dd9..ea433046cf 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -113,7 +113,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "max_connections=%d max_worker_processes=%d "
 						 "max_wal_senders=%d max_prepared_xacts=%d "
 						 "max_locks_per_xact=%d wal_level=%s "
-						 "wal_log_hints=%s track_commit_timestamp=%s",
+						 "wal_log_hints=%s track_commit_timestamp=%s "
+						 "enable_csn_snapshot=%s",
 						 xlrec.MaxConnections,
 						 xlrec.max_worker_processes,
 						 xlrec.max_wal_senders,
@@ -121,7 +122,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.max_locks_per_xact,
 						 wal_level_str,
 						 xlrec.wal_log_hints ? "on" : "off",
-						 xlrec.track_commit_timestamp ? "on" : "off");
+						 xlrec.track_commit_timestamp ? "on" : "off",
+						 xlrec.enable_csn_snapshot ? "on" : "off");
 	}
 	else if (info == XLOG_FPW_CHANGE)
 	{
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..fc0321ee6b 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,6 +15,8 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	clog.o \
 	commit_ts.o \
+	csn_log.o \
+	csn_snapshot.o \
 	generic_xlog.o \
 	multixact.o \
 	parallel.o \
diff --git a/src/backend/access/transam/csn_log.c b/src/backend/access/transam/csn_log.c
new file mode 100644
index 0000000000..33517271ed
--- /dev/null
+++ b/src/backend/access/transam/csn_log.c
@@ -0,0 +1,748 @@
+/*-----------------------------------------------------------------------------
+ *
+ * csn_log.c
+ *		Track commit sequence numbers of finished transactions
+ *
+ * This module provides SLRU to store CSN for each transaction.  This
+ * mapping need to be kept only for xid's greater then oldestXid, but
+ * that can require arbitrary large amounts of memory in case of long-lived
+ * transactions.  Because of same lifetime and persistancy requirements
+ * this module is quite similar to subtrans.c
+ *
+ * If we switch database from CSN-base snapshot to xid-base snapshot then,
+ * nothing wrong. But if we switch xid-base snapshot to CSN-base snapshot
+ * it should decide a new xid which begin csn-base check. It can not be
+ * oldestActiveXID because of prepared transaction.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_log.h"
+#include "access/slru.h"
+#include "access/csn_snapshot.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/xlogutils.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "portability/instr_time.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/snapmgr.h"
+#include "access/xlog_internal.h"
+
+/*
+ * We use csnSnapshotActive to judge if csn snapshot enabled instead of by
+ * enable_csn_snapshot, this design is similar to 'track_commit_timestamp'.
+ *
+ * Because in process of replication if master changes 'enable_csn_snapshot'
+ * in a database restart, standby should apply wal record for GUC changed,
+ * then it's difficult to notice all backends about that. So they can get
+ * the message by 'csnSnapshotActive' which in shared buffer. It will not
+ * acquire a lock, so without performance issue.
+ * last_max_csn - Record the max csn till now.
+ * last_csn_log_wal - for interval we log the assign csn to wal
+ * oldestXmin - first sensible Xmin on the first existed page in the CSN Log
+ */
+typedef struct CSNShared
+{
+	bool				csnSnapshotActive;
+	pg_atomic_uint32	oldestXmin;
+	CSN					last_max_csn;
+	CSN					last_csn_log_wal;
+	volatile slock_t	lock;
+} CSNShared;
+
+CSNShared *csnShared;
+
+/*
+ * Defines for CSNLog page sizes.  A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * CSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/CSN_LOG_XACTS_PER_PAGE, and CSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateCSNLog (see CSNLogPagePrecedes).
+ */
+
+/* We store the commit CSN for each xid */
+#define CSN_LOG_XACTS_PER_PAGE (BLCKSZ / sizeof(CSN))
+
+#define TransactionIdToPage(xid)	((xid) / (TransactionId) CSN_LOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CSN_LOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData CSNLogCtlData;
+#define CsnlogCtl (&CSNLogCtlData)
+
+static int	ZeroCSNLogPage(int pageno, bool write_xlog);
+static void ZeroTruncateCSNLogPage(int pageno, bool write_xlog);
+static bool CSNLogPagePrecedes(int page1, int page2);
+static void CSNLogSetPageStatus(TransactionId xid, int nsubxids,
+									  TransactionId *subxids,
+									  CSN csn, int pageno);
+static void CSNLogSetCSNInSlot(TransactionId xid, CSN csn, int slotno);
+
+static void WriteCSNXlogRec(TransactionId xid, int nsubxids,
+							TransactionId *subxids, CSN csn);
+static void WriteZeroCSNPageXlogRec(int pageno);
+static void WriteTruncateCSNXlogRec(int pageno);
+static void set_oldest_xmin(TransactionId xid);
+
+
+/*
+ * Number of shared CSNLog buffers.
+ */
+static Size
+CSNLogShmemBuffers(void)
+{
+	return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for CsnlogCtl.
+ */
+Size
+CSNLogShmemSize(void)
+{
+	return SimpleLruShmemSize(CSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for CSNLog.
+ */
+void
+CSNLogShmemInit(void)
+{
+	bool		found;
+
+	CsnlogCtl->PagePrecedes = CSNLogPagePrecedes;
+	SimpleLruInit(CsnlogCtl, "CSNLog Ctl", CSNLogShmemBuffers(), 0,
+				  CSNLogSLRULock, "pg_csn", LWTRANCHE_CSN_LOG_BUFFERS,
+				  SYNC_HANDLER_CSN);
+
+	csnShared = ShmemInitStruct("CSNlog shared",
+									 sizeof(CSNShared),
+									 &found);
+	if (!found)
+	{
+		csnShared->csnSnapshotActive = false;
+		pg_atomic_init_u32(&csnShared->oldestXmin, InvalidTransactionId);
+		csnShared->last_max_csn = InvalidCSN;
+		csnShared->last_csn_log_wal = InvalidCSN;
+		SpinLockInit(&csnShared->lock);
+	}
+}
+
+/*
+ * CSNLogSetCSN
+ *
+ * Record CSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedCSN for abort cases.
+ */
+void
+CSNLogSetCSN(TransactionId xid, int nsubxids, TransactionId *subxids, CSN csn,
+			 bool write_xlog)
+{
+	int pageno;
+	int i = 0;
+	int offset = 0;
+
+	Assert(TransactionIdIsValid(xid));
+
+	pageno = TransactionIdToPage(xid);		/* get page of parent */
+
+	if(write_xlog)
+		WriteCSNXlogRec(xid, nsubxids, subxids, csn);
+
+	for (;;)
+	{
+		int num_on_page = 0;
+
+		/* Form subtransactions bucket that can be written on the same page */
+		while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+		{
+			num_on_page++;
+			i++;
+		}
+
+		CSNLogSetPageStatus(xid,
+							num_on_page, subxids + offset,
+							csn, pageno);
+		if (i >= nsubxids)
+			break;
+
+		offset = i;
+		pageno = TransactionIdToPage(subxids[offset]);
+		xid = InvalidTransactionId;
+	}
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page.  Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+CSNLogSetPageStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
+					CSN csn, int pageno)
+{
+	int slotno;
+	int i;
+
+	LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+
+	slotno = SimpleLruReadPage(CsnlogCtl, pageno, true, xid);
+
+	/* Subtransactions first, if needed ... */
+	for (i = 0; i < nsubxids; i++)
+	{
+		Assert(CsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+		CSNLogSetCSNInSlot(subxids[i], csn, slotno);
+	}
+
+	/* ... then the main transaction */
+	if (TransactionIdIsValid(xid))
+		CSNLogSetCSNInSlot(xid, csn, slotno);
+
+	CsnlogCtl->shared->page_dirty[slotno] = true;
+
+	LWLockRelease(CSNLogSLRULock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+CSNLogSetCSNInSlot(TransactionId xid, CSN csn, int slotno)
+{
+	int entryno = TransactionIdToPgIndex(xid);
+	CSN *ptr;
+
+	Assert(LWLockHeldByMe(CSNLogSLRULock));
+
+	ptr = (CSN *) (CsnlogCtl->shared->page_buffer[slotno] +
+														entryno * sizeof(CSN));
+	*ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetCSN() in csn_snapshot.c is the
+ * intended caller.
+ */
+CSN
+CSNLogGetCSNByXid(TransactionId xid)
+{
+	int pageno = TransactionIdToPage(xid);
+	int entryno = TransactionIdToPgIndex(xid);
+	int slotno;
+	CSN csn;
+
+	/* lock is acquired by SimpleLruReadPage_ReadOnly */
+	slotno = SimpleLruReadPage_ReadOnly(CsnlogCtl, pageno, xid);
+	csn = *(CSN *) (CsnlogCtl->shared->page_buffer[slotno] +
+														entryno * sizeof(CSN));
+	LWLockRelease(CSNLogSLRULock);
+
+	return csn;
+}
+
+/*
+ * Initialize (or reinitialize) a page of CSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroCSNLogPage(int pageno, bool write_xlog)
+{
+	Assert(LWLockHeldByMe(CSNLogSLRULock));
+	if(write_xlog)
+		WriteZeroCSNPageXlogRec(pageno);
+	return SimpleLruZeroPage(CsnlogCtl, pageno);
+}
+
+static void
+ZeroTruncateCSNLogPage(int pageno, bool write_xlog)
+{
+	if(write_xlog)
+		WriteTruncateCSNXlogRec(pageno);
+	SimpleLruTruncate(CsnlogCtl, pageno);
+}
+
+void
+ActivateCSNlog(void)
+{
+	int				pageno;
+	TransactionId	nextXid = InvalidTransactionId;
+	TransactionId	oldest_xid = InvalidTransactionId;
+
+	if (csnShared->csnSnapshotActive)
+		return;
+
+	nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
+	pageno = TransactionIdToPage(nextXid);
+
+	LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+
+	/*
+	 * Create the current segment file, if necessary.
+	 * This means that
+	 */
+	if (!SimpleLruDoesPhysicalPageExist(CsnlogCtl, pageno))
+	{
+		int slotno;
+		TransactionId curxid = nextXid;
+
+		slotno = ZeroCSNLogPage(pageno, false);
+		SimpleLruWritePage(CsnlogCtl, slotno);
+
+		elog(LOG, "Create SLRU page=%d, slotno=%d for xid %u on a CSN log activation",
+			 pageno, slotno, nextXid);
+
+		/*
+		 * nextXid isn't first xid on the page. It is the first page in the CSN
+		 * log. Set UnclearCSN value into all previous slots on this page.
+		 * This xid value can be used as an oldest xid in the CSN log.
+		 */
+		if (TransactionIdToPgIndex(nextXid) > 0)
+		{
+			/* Cleaning procedure. Can be optimized. */
+			do
+			{
+				curxid--;
+				CSNLogSetCSNInSlot(curxid, UnclearCSN, slotno);
+			} while (TransactionIdToPgIndex(curxid) > 0);
+
+			elog(LOG,
+				 "Set UnclearCSN values for %d xids in the range [%u,%u]",
+				 nextXid - curxid, curxid, nextXid-1);
+
+			/* Oldest XID found on this page */
+			oldest_xid = nextXid;
+		}
+	}
+	LWLockRelease(CSNLogSLRULock);
+
+	if (!TransactionIdIsValid(oldest_xid))
+	{
+		TransactionId curxid;
+
+		elog(LOG, "Search for the oldest xid across previous pages");
+
+		/* Need to scan previous pages for an oldest xid. */
+		while (pageno > 0 && SimpleLruDoesPhysicalPageExist(CsnlogCtl, pageno - 1))
+			pageno--;
+
+		/* look up for the first clear xid value. */
+		curxid = pageno * (TransactionId) CSN_LOG_XACTS_PER_PAGE;
+		while(CSNLogGetCSNByXid(curxid) == UnclearCSN)
+			curxid++;
+		oldest_xid = curxid;
+	}
+
+	set_oldest_xmin(oldest_xid);
+	csnShared->csnSnapshotActive = true;
+}
+
+bool
+get_csnlog_status(void)
+{
+	return csnShared->csnSnapshotActive;
+}
+
+void
+DeactivateCSNlog(void)
+{
+	csnShared->csnSnapshotActive = false;
+	set_oldest_xmin(InvalidTransactionId);
+	LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+	(void) SlruScanDirectory(CsnlogCtl, SlruScanDirCbDeleteAll, NULL);
+	LWLockRelease(CSNLogSLRULock);
+	elog(LOG, "CSN log has deactivated");
+}
+
+void
+StartupCSN(void)
+{
+	ActivateCSNlog();
+}
+
+void
+CompleteCSNInitialization(void)
+{
+	/*
+	 * If the feature is not enabled, turn it off for good.  This also removes
+	 * any leftover data.
+	 *
+	 * Conversely, we activate the module if the feature is enabled.  This is
+	 * necessary for primary and standby as the activation depends on the
+	 * control file contents at the beginning of recovery or when a
+	 * XLOG_PARAMETER_CHANGE is replayed.
+	 */
+	if (!enable_csn_snapshot)
+		DeactivateCSNlog();
+	else
+		ActivateCSNlog();
+}
+
+void
+CSNlogParameterChange(bool newvalue, bool oldvalue)
+{
+	if (newvalue)
+	{
+		if (!csnShared->csnSnapshotActive)
+			ActivateCSNlog();
+	}
+	else if (csnShared->csnSnapshotActive)
+		DeactivateCSNlog();
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointCSNLog(void)
+{
+	if (!get_csnlog_status())
+		return;
+
+	/*
+	 * Flush dirty CSNLog pages to disk.
+	 *
+	 * This is not actually necessary from a correctness point of view. We do
+	 * it merely to improve the odds that writing of dirty pages is done by
+	 * the checkpoint process and not by backends.
+	 */
+	TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_START(true);
+	SimpleLruWriteAll(CsnlogCtl, true);
+	TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that CSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock.  We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendCSNLog(TransactionId newestXact)
+{
+	int			pageno;
+
+	if (!get_csnlog_status())
+		return;
+
+	/*
+	 * No work except at first XID of a page.  But beware: just after
+	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+	 */
+	if (TransactionIdToPgIndex(newestXact) != 0 &&
+		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+		return;
+
+	pageno = TransactionIdToPage(newestXact);
+
+	LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+
+	/* Zero the page and make an XLOG entry about it */
+	ZeroCSNLogPage(pageno, !InRecovery);
+
+	LWLockRelease(CSNLogSLRULock);
+}
+
+/*
+ * Remove all CSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateCSNLog(TransactionId oldestXact)
+{
+	int				cutoffPage;
+	TransactionId	oldestXmin;
+
+	/* Can't do truncation because WAL messages isn't allowed during recovery */
+	if (RecoveryInProgress() || !get_csnlog_status())
+		return;
+
+	/*
+	 * The cutoff point is the start of the segment containing oldestXact. We
+	 * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+	 * back one transaction to avoid passing a cutoff page that hasn't been
+	 * created yet in the rare case that oldestXact would be the first item on
+	 * a page and oldestXact == next XID.  In that case, if we didn't subtract
+	 * one, we'd trigger SimpleLruTruncate's wraparound detection.
+	 */
+	TransactionIdRetreat(oldestXact);
+	cutoffPage = TransactionIdToPage(oldestXact);
+
+	/* Detect, that we really need to cut CSN log. */
+	oldestXmin = pg_atomic_read_u32(&csnShared->oldestXmin);
+
+	if (TransactionIdToPage(oldestXmin) < cutoffPage)
+	{
+		/* OldestXact is located in the same page as oldestXmin. No actions needed. */
+		return;
+	}
+
+	/*
+	 * Shift oldestXmin to the start of new first page. Use first position
+	 * on the page because all transactions on this page is created with enabled
+	 * CSN snapshot machinery.
+	 */
+	pg_atomic_write_u32(&csnShared->oldestXmin,
+						oldestXact - TransactionIdToPgIndex(oldestXact));
+
+	SpinLockRelease(&csnShared->lock);
+	ZeroTruncateCSNLogPage(cutoffPage, true);
+}
+
+/*
+ * Decide which of two CSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic.  However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+CSNLogPagePrecedes(int page1, int page2)
+{
+	TransactionId xid1;
+	TransactionId xid2;
+
+	xid1 = ((TransactionId) page1) * CSN_LOG_XACTS_PER_PAGE;
+	xid1 += FirstNormalTransactionId;
+	xid2 = ((TransactionId) page2) * CSN_LOG_XACTS_PER_PAGE;
+	xid2 += FirstNormalTransactionId;
+
+	return TransactionIdPrecedes(xid1, xid2);
+}
+
+void
+WriteAssignCSNXlogRec(CSN csn)
+{
+	Assert(enable_csn_wal && csn <= csnShared->last_csn_log_wal);
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) (&csn), sizeof(CSN));
+	XLogInsert(RM_CSNLOG_ID, XLOG_CSN_ASSIGNMENT);
+}
+
+static void
+WriteCSNXlogRec(TransactionId xid, int nsubxids,
+				TransactionId *subxids, CSN csn)
+{
+	xl_csn_set xlrec;
+
+	if(!enable_csn_wal)
+		return;
+
+	xlrec.xtop = xid;
+	xlrec.nsubxacts = nsubxids;
+	xlrec.csn = csn;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, MinSizeOfCSNSet);
+	XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
+	XLogInsert(RM_CSNLOG_ID, XLOG_CSN_SETCSN);
+}
+
+/*
+ * Write a ZEROPAGE xlog record
+ */
+static void
+WriteZeroCSNPageXlogRec(int pageno)
+{
+	if(!enable_csn_wal)
+	{
+		return;
+	}
+	XLogBeginInsert();
+	XLogRegisterData((char *) (&pageno), sizeof(int));
+	(void) XLogInsert(RM_CSNLOG_ID, XLOG_CSN_ZEROPAGE);
+}
+
+/*
+ * Write a TRUNCATE xlog record
+ */
+static void
+WriteTruncateCSNXlogRec(int pageno)
+{
+	if(!enable_csn_wal)
+	{
+		return;
+	}
+	XLogBeginInsert();
+	XLogRegisterData((char *) (&pageno), sizeof(int));
+	XLogInsert(RM_CSNLOG_ID, XLOG_CSN_TRUNCATE);
+}
+
+
+void
+csnlog_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	/* Backup blocks are not used in csnlog records */
+	Assert(!XLogRecHasAnyBlockRefs(record));
+
+	if (info == XLOG_CSN_ASSIGNMENT)
+	{
+		CSN csn;
+
+		memcpy(&csn, XLogRecGetData(record), sizeof(CSN));
+		/* XXX: Do we really not needed to acquire the lock here? */
+		csnShared->last_max_csn = csn;
+	}
+	else if (info == XLOG_CSN_SETCSN)
+	{
+		xl_csn_set *xlrec = (xl_csn_set *) XLogRecGetData(record);
+		CSNLogSetCSN(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub, xlrec->csn, false);
+	}
+	else if (info == XLOG_CSN_ZEROPAGE)
+	{
+		int			pageno;
+		int			slotno;
+
+		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE);
+		slotno = ZeroCSNLogPage(pageno, false);
+		SimpleLruWritePage(CsnlogCtl, slotno);
+		LWLockRelease(CSNLogSLRULock);
+		Assert(!CsnlogCtl->shared->page_dirty[slotno]);
+
+	}
+	else if (info == XLOG_CSN_TRUNCATE)
+	{
+		int			pageno;
+
+		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		CsnlogCtl->shared->latest_page_number = pageno;
+		ZeroTruncateCSNLogPage(pageno, false);
+	}
+	else
+		elog(PANIC, "csnlog_redo: unknown op code %u", info);
+}
+
+/*
+ * Entrypoint for sync.c to sync members files.
+ */
+int
+csnsyncfiletag(const FileTag *ftag, char *path)
+{
+	return SlruSyncFileTag(&CSNLogCtlData, ftag, path);
+}
+
+/*
+ * GenerateCSN
+ *
+ * Generate CSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+CSN
+GenerateCSN(bool locked, CSN assign)
+{
+	instr_time	current_time;
+	CSN	csn;
+	CSN log_csn = InvalidCSN;
+
+	Assert(get_csnlog_status() || csn_snapshot_defer_time > 0);
+
+	/* TODO: create some macro that add small random shift to current time. */
+	INSTR_TIME_SET_CURRENT(current_time);
+	csn = (CSN) INSTR_TIME_GET_NANOSEC(current_time) + (int64) (csn_time_shift * 1E9);
+
+	if(assign != InvalidCSN && csn < assign)
+		csn = assign;
+
+	/* TODO: change to atomics? */
+	if (!locked)
+		SpinLockAcquire(&csnShared->lock);
+
+	if (csn <= csnShared->last_max_csn)
+		csn = csnShared->last_max_csn + 1;
+	csnShared->last_max_csn = csn;
+
+	if (enable_csn_wal && csn > csnShared->last_csn_log_wal)
+	{
+		/*
+		 * We log the CSN 5s greater than generated, you can see comments on
+		 * the CSN_ASSIGN_TIME_INTERVAL.
+		 */
+		log_csn = CSNAddByNanosec(csn, CSN_ASSIGN_TIME_INTERVAL);
+		csnShared->last_csn_log_wal = log_csn;
+	}
+
+	if (!locked)
+		SpinLockRelease(&csnShared->lock);
+
+	if (log_csn != InvalidCSN)
+		WriteAssignCSNXlogRec(csn);
+
+	return csn;
+}
+
+CSN
+GetLastGeneratedCSN(void)
+{
+	CSN csn;
+
+	SpinLockAcquire(&csnShared->lock);
+	csn = csnShared->last_max_csn;
+	SpinLockRelease(&csnShared->lock);
+	return csn;
+}
+
+/*
+ * Mostly for debug purposes.
+ */
+static void
+set_oldest_xmin(TransactionId xid)
+{
+	elog(LOG, "Oldest Xmin for CSN will be changed from %u to %u",
+		 pg_atomic_read_u32(&csnShared->oldestXmin), xid);
+
+	pg_atomic_write_u32(&csnShared->oldestXmin, xid);
+}
+
+TransactionId
+GetOldestXmin(void)
+{
+	Assert(get_csnlog_status());
+	return pg_atomic_read_u32(&csnShared->oldestXmin);
+}
diff --git a/src/backend/access/transam/csn_snapshot.c b/src/backend/access/transam/csn_snapshot.c
new file mode 100644
index 0000000000..a381d219ea
--- /dev/null
+++ b/src/backend/access/transam/csn_snapshot.c
@@ -0,0 +1,687 @@
+/*-------------------------------------------------------------------------
+ *
+ * csn_snapshot.c
+ *		Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/csn_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_snapshot.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported snapshot_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+static TransactionId xmin_for_csn = InvalidTransactionId;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size CSNSnapshotXidMap circular buffer.
+ */
+int csn_snapshot_defer_time;
+
+int csn_time_shift;
+
+/*
+ * CSNSnapshotXidMap
+ *
+ * To be able to install csn snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid.  Here we
+ * keep track of correspondence between snapshot's snapshot_csn and oldestXid
+ * that was set at the time when the snapshot was taken.  Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node).
+ *
+ * On each snapshot acquisition CSNSnapshotMapXmin() is called and stores
+ * correspondence between current snapshot_csn and oldestXmin in a sparse way:
+ * snapshot_csn is rounded to seconds (and here we use the fact that snapshot_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded snapshot_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by csn_snapshot_defer_time GUC.
+ *
+ * When csn snapshot arrives we check that its
+ * snapshot_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message.  If snapshot_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid.  That way GetOldestXmin() can take into account backends
+ * with imported csn snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * csn snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import.  Otherwise, we can create a feedback loop:
+ * xmin's of imported csn snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin.  All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct CSNSnapshotXidMap
+{
+	int				 head;				/* offset of current freshest value */
+	int				 size;				/* total size of circular buffer */
+	CSN_atomic		 last_csn_seconds;	/* last rounded csn that changed
+										 * xmin_by_second[] */
+	TransactionId   *xmin_by_second;	/* circular buffer of oldestXmin's */
+}
+CSNSnapshotXidMap;
+
+static CSNSnapshotXidMap *csnXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+CSNSnapshotShmemSize(void)
+{
+	Size	size = 0;
+
+	if (csn_snapshot_defer_time > 0)
+	{
+		size += sizeof(CSNSnapshotXidMap);
+		size += csn_snapshot_defer_time*sizeof(TransactionId);
+		size = MAXALIGN(size);
+	}
+
+	return size;
+}
+
+/* Init shared memory structures */
+void
+CSNSnapshotShmemInit()
+{
+	bool found;
+
+	if (csn_snapshot_defer_time > 0)
+	{
+		csnXidMap = ShmemInitStruct("csnXidMap",
+								   sizeof(CSNSnapshotXidMap),
+								   &found);
+		if (!found)
+		{
+			int i;
+
+			pg_atomic_init_u64(&csnXidMap->last_csn_seconds, 0);
+			csnXidMap->head = 0;
+			csnXidMap->size = csn_snapshot_defer_time;
+			csnXidMap->xmin_by_second =
+							ShmemAlloc(sizeof(TransactionId)*csnXidMap->size);
+
+			for (i = 0; i < csnXidMap->size; i++)
+				csnXidMap->xmin_by_second[i] = InvalidTransactionId;
+		}
+	}
+}
+
+/*
+ * CSNSnapshotStartup
+ *
+ * Set csnXidMap entries to oldestActiveXID during startup.
+ */
+void
+CSNSnapshotStartup(TransactionId oldestActiveXID)
+{
+	/*
+	 * Run only if we have initialized shared memory and csnXidMap
+	 * is enabled.
+	 */
+	if (IsNormalProcessingMode() &&
+		enable_csn_snapshot && csn_snapshot_defer_time > 0)
+	{
+		int i;
+
+		Assert(TransactionIdIsValid(oldestActiveXID));
+		for (i = 0; i < csnXidMap->size; i++)
+			csnXidMap->xmin_by_second[i] = oldestActiveXID;
+		ProcArraySetCSNSnapshotXmin(oldestActiveXID);
+
+		elog(LOG, "CSN map initialized with oldest active xid %u", oldestActiveXID);
+	}
+}
+
+/*
+ * CSNSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * CSN snapshot. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got csn called CSNSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * CSNSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ *		* We already hold our xmin in MyPgXact, so our snapshot will not be
+ *		  harmed even though ProcArrayLock is released.
+ *
+ *		* snapshot_csn is always pessmistically rounded up to the next
+ *		  second.
+ *
+ *		* For performance reasons, xmin value for particular second is filled
+ *		  only once. Because of that instead of writing to buffer just our
+ *		  xmin (which is enough for our snapshot), we bump oldestXmin there --
+ *		  it mitigates the possibility of damaging someone else's snapshot by
+ *		  writing to the buffer too advanced value in case of slowness of
+ *		  another backend who generated csn earlier, but didn't manage to
+ *		  insert it before us.
+ *
+ *		* if CSNSnapshotMapXmin() founds a gap in several seconds between
+ *		  current call and latest completed call then it should fill that gap
+ *		  with latest known values instead of new one. Otherwise it is
+ *		  possible (however highly unlikely) that this gap also happend
+ *		  between taking snapshot and call to CSNSnapshotMapXmin() for some
+ *		  backend. And we are at risk to fill circullar buffer with
+ *		  oldestXmin's that are bigger then they actually were.
+ */
+void
+CSNSnapshotMapXmin(SnapshotCSN snapshot_csn)
+{
+	int offset, gap, i;
+	SnapshotCSN csn_seconds;
+	SnapshotCSN last_csn_seconds;
+	volatile TransactionId oldest_deferred_xmin;
+	TransactionId current_oldest_xmin, previous_oldest_xmin;
+	TransactionId ImportedXmin;
+
+	/* Callers should check config values */
+	Assert(csn_snapshot_defer_time > 0);
+	Assert(csnXidMap != NULL);
+	/*
+	 * Round up snapshot_csn to the next second -- pessimistically and safely.
+	 */
+	csn_seconds = (snapshot_csn / NSECS_PER_SEC + 1);
+
+	/*
+	 * Fast-path check. Avoid taking exclusive CSNSnapshotXidMapLock lock
+	 * if oldestXid was already written to xmin_by_second[] for this rounded
+	 * snapshot_csn.
+	 */
+	if (pg_atomic_read_u64(&csnXidMap->last_csn_seconds) >= csn_seconds)
+		return;
+
+	/* Ok, we have new entry (or entries) */
+	LWLockAcquire(CSNSnapshotXidMapLock, LW_EXCLUSIVE);
+
+	/* Re-check last_csn_seconds under lock */
+	last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds);
+	if (last_csn_seconds >= csn_seconds)
+	{
+		LWLockRelease(CSNSnapshotXidMapLock);
+		return;
+	}
+	pg_atomic_write_u64(&csnXidMap->last_csn_seconds, csn_seconds);
+
+	/*
+	 * Count oldest_xmin.
+	 *
+	 * It was possible to calculate oldest_xmin during corresponding snapshot
+	 * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+	 * PgProc. And we need info about originalXmin (see comment to csnXidMap)
+	 * which is stored in PgProc because of threats in comments around PgXact
+	 * about extending it with new fields. So just calculate oldest_xmin again,
+	 * that anyway happens quite rarely.
+	 */
+
+	/*
+	 * Don't afraid here because csn_snapshot_xmin will hold border of
+	 * minimal non-removable from vacuuming.
+	 */
+	ImportedXmin = MyProc->xmin;
+	MyProc->xmin = MyProc->originalXmin;
+	current_oldest_xmin = GetOldestNonRemovableTransactionId(NULL);
+	MyProc->xmin = ImportedXmin;
+	Assert(TransactionIdIsNormal(current_oldest_xmin));
+
+	previous_oldest_xmin = csnXidMap->xmin_by_second[csnXidMap->head];
+	Assert(TransactionIdIsNormal(previous_oldest_xmin) || !enable_csn_snapshot);
+
+	gap = csn_seconds - last_csn_seconds;
+	offset = csn_seconds % csnXidMap->size;
+
+	/* Sanity check before we update head and gap */
+	Assert( gap >= 1 );
+	Assert( (csnXidMap->head + gap) % csnXidMap->size == offset );
+
+	gap = gap > csnXidMap->size ? csnXidMap->size : gap;
+	csnXidMap->head = offset;
+
+	/* Fill new entry with current_oldest_xmin */
+	csnXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+	/*
+	 * If we have gap then fill it with previous_oldest_xmin for reasons
+	 * outlined in comment above this function.
+	 */
+	for (i = 1; i < gap; i++)
+	{
+		offset = (offset + csnXidMap->size - 1) % csnXidMap->size;
+		csnXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+	}
+
+	oldest_deferred_xmin =
+		csnXidMap->xmin_by_second[ (csnXidMap->head + 1) % csnXidMap->size ];
+
+	LWLockRelease(CSNSnapshotXidMapLock);
+
+	elog(DEBUG5, "Advance xmin for CSN. Oldest deferred xmin = %u",
+		 oldest_deferred_xmin);
+
+	/*
+	 * Advance procArray->csn_snapshot_xmin after we released
+	 * CSNSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+	 * never goes backwards regardless of how slow we can do that.
+	 */
+	/*Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+										ProcArrayGetCSNSnapshotXmin()));*/
+	ProcArraySetCSNSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * CSNSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_csn was taken.
+ */
+TransactionId
+CSNSnapshotToXmin(SnapshotCSN snapshot_csn)
+{
+	TransactionId xmin;
+	SnapshotCSN csn_seconds;
+	volatile SnapshotCSN last_csn_seconds;
+
+	/* Callers should check config values */
+	Assert(csn_snapshot_defer_time > 0);
+	Assert(csnXidMap != NULL);
+
+	/* Round down to get conservative estimates */
+	csn_seconds = (snapshot_csn / NSECS_PER_SEC);
+
+	LWLockAcquire(CSNSnapshotXidMapLock, LW_SHARED);
+	last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds);
+	if (csn_seconds > last_csn_seconds)
+	{
+		/* we don't have entry for this snapshot_csn yet, return latest known */
+		xmin = csnXidMap->xmin_by_second[csnXidMap->head];
+	}
+	else if (last_csn_seconds - csn_seconds < csnXidMap->size)
+	{
+		/* we are good, retrieve value from our map */
+		Assert(last_csn_seconds % csnXidMap->size == csnXidMap->head);
+		xmin = csnXidMap->xmin_by_second[csn_seconds % csnXidMap->size];
+	}
+	else
+	{
+		/* requested snapshot_csn is too old, let caller know */
+		xmin = InvalidTransactionId;
+	}
+	LWLockRelease(CSNSnapshotXidMapLock);
+
+	return xmin;
+}
+
+/*
+ * CSNSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+SnapshotCSN
+CSNSnapshotPrepareCurrent(void)
+{
+	TransactionId xid = GetCurrentTransactionIdIfAny();
+
+	if (!enable_csn_snapshot)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"enable_csn_snapshot")));
+
+	if (TransactionIdIsValid(xid))
+	{
+		TransactionId *subxids;
+		int nsubxids = xactGetCommittedChildren(&subxids);
+		CSNLogSetCSN(xid, nsubxids, subxids, InDoubtCSN, true);
+	}
+
+	/* Nothing to write if we don't have xid */
+
+	return GenerateCSN(false, InvalidCSN);
+}
+
+
+/*
+ * CSNSnapshotAssignCurrent
+ *
+ * Assign SnapshotCSN to the currently active transaction. SnapshotCSN is supposedly
+ * maximal among of values returned by CSNSnapshotPrepareCurrent and
+ * pg_csn_snapshot_prepare.
+ */
+void
+CSNSnapshotAssignCurrent(SnapshotCSN snapshot_csn)
+{
+	if (!enable_csn_snapshot)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"enable_csn_snapshot")));
+
+	if (!CSNIsNormal(snapshot_csn))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pg_csn_snapshot_assign expects normal snapshot_csn")));
+
+	Assert(snapshot_csn != InvalidCSN);
+	/* We do not care the Generate result, we just want to make sure max
+	 * csnShared->last_max_csn value.
+	 */
+	GenerateCSN(false, snapshot_csn);
+
+	/* Set csn and defuse ProcArrayEndTransaction from assigning one */
+	pg_atomic_write_u64(&MyProc->assignedCSN, snapshot_csn);
+}
+
+/*
+ * CSNSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive snapshot_csn
+ * which is greater than snapshot_csn on this node. To preserve proper isolation
+ * this node needs to wait when such snapshot_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+CSNSnapshotSync(SnapshotCSN remote_csn)
+{
+	SnapshotCSN	local_csn;
+	SnapshotCSN	delta;
+
+	Assert(enable_csn_snapshot);
+
+	for(;;)
+	{
+		if (GetLastGeneratedCSN() > remote_csn)
+			return;
+
+		local_csn = GenerateCSN(true, InvalidCSN);
+
+		if (local_csn >= remote_csn)
+			/*
+			 * Everything is fine too, but last_max_csn wasn't updated for
+			 * some time.
+			 */
+			return;
+
+		/* Okay we need to sleep now */
+		delta = remote_csn - local_csn;
+		if (delta > SNAP_DESYNC_COMPLAIN)
+			ereport(WARNING,
+				(errmsg("remote global snapshot exceeds ours by more than a second"),
+				 errhint("Consider running NTPd on servers participating in global transaction")));
+
+		/* TODO: report this sleeptime somewhere? */
+		pg_usleep((long) (delta/NSECS_PER_USEC));
+
+		/*
+		 * Loop that checks to ensure that we actually slept for specified
+		 * amount of time.
+		 */
+	}
+
+	Assert(false); /* Should not happend */
+	return;
+}
+
+/*
+ * TransactionIdGetCSN
+ *
+ * Get CSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+CSN
+TransactionIdGetCSN(TransactionId xid)
+{
+	CSN csn;
+
+	/* Handle permanent TransactionId's for which we don't have mapping */
+	if (!TransactionIdIsNormal(xid))
+	{
+		if (xid == InvalidTransactionId)
+			return AbortedCSN;
+		if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+			return FrozenCSN;
+		Assert(false); /* Should not happend */
+	}
+
+	/*
+	 * If we just switch a xid-snapsot to a csn_snapshot, we should handle a start
+	 * xid for csn base check. Just in case we have prepared transaction which
+	 * hold the TransactionXmin but without CSN.
+	 */
+	xmin_for_csn = GetOldestXmin();
+
+	/*
+	 * For the xid with 'xid >= TransactionXmin and xid < xmin_for_csn',
+	 * it defined as unclear csn which follow xid-snapshot result.
+	 */
+	if(!TransactionIdPrecedes(xid, TransactionXmin) &&
+							TransactionIdPrecedes(xid, xmin_for_csn))
+	{
+		elog(LOG, "UnclearCSN was returned. xid=%u, TransactionXmin=%u, xmin_for_csn=%u",
+			xid, TransactionXmin, xmin_for_csn);
+		return UnclearCSN;
+	}
+	/*
+	 * For xids which less then TransactionXmin CSNLog can be already
+	 * trimmed but we know that such transaction is definitely not concurrently
+	 * running according to any snapshot including timetravel ones. Callers
+	 * should check TransactionDidCommit after.
+	 */
+	if (TransactionIdPrecedes(xid, TransactionXmin))
+		return FrozenCSN;
+
+	/* Read CSN from SLRU */
+	csn = CSNLogGetCSNByXid(xid);
+
+	/*
+	 * If we faced InDoubt state then transaction is being committed and we
+	 * should wait until CSN will be assigned so that visibility check
+	 * could decide whether tuple is in snapshot. See also comments in
+	 * CSNSnapshotPrecommit().
+	 */
+	if (CSNIsInDoubt(csn))
+	{
+		XactLockTableWait(SubTransGetTopmostTransaction(xid), NULL, NULL, XLTW_None);
+		csn = CSNLogGetCSNByXid(xid);
+		Assert(CSNIsNormal(csn) || CSNIsAborted(csn));
+	}
+
+	Assert(CSNIsNormal(csn) || CSNIsInProgress(csn) || CSNIsAborted(csn));
+	return csn;
+}
+
+/*
+ * XidInCSNSnapshot
+ *
+ * Version of XidInMVCCSnapshot for transactions. For non-imported
+ * csn snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInCSNSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInCSNSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	CSN csn;
+
+	csn = TransactionIdGetCSN(xid);
+
+	if (CSNIsNormal(csn))
+		return (csn >= snapshot->snapshot_csn);
+	else if (CSNIsFrozen(csn))
+	{
+		/* It is bootstrap or frozen transaction */
+		return false;
+	}
+	else if(CSNIsUnclear(csn))
+	{
+		/*
+		 * Some xid can not figure out csn because of snapshot switch,
+		 * and we can follow xid-base result.
+		 */
+		return true;
+	}
+	else
+	{
+		/* It is aborted or in-progress */
+		Assert(CSNIsAborted(csn) || CSNIsInProgress(csn));
+		if (CSNIsAborted(csn))
+			Assert(TransactionIdDidAbort(xid));
+		return true;
+	}
+}
+
+
+/*****************************************************************************
+ * Functions to handle transactions commit.
+ *
+ * For local transactions CSNSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires csn under ProcArray lock and stores it
+ * in proc->assignedCSN. It's important that csn for commit is
+ * generated under ProcArray lock, otherwise snapshots won't
+ * be equivalent. Consequent call to CSNSnapshotCommit will write
+ * proc->assignedCSN to CSNLog.
+ *
+ *
+ * CSNSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * CSNSnapshotAbort
+ *
+ * Abort transaction in CsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+CSNSnapshotAbort(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	if (!get_csnlog_status())
+		return;
+
+	CSNLogSetCSN(xid, nsubxids, subxids, AbortedCSN, true);
+
+	/*
+	 * Clean assignedCSN anyway, as it was possibly set in
+	 * XidSnapshotAssignCsnCurrent.
+	 */
+	pg_atomic_write_u64(&proc->assignedCSN, InProgressCSN);
+}
+
+/*
+ * CSNSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and CSN assignment can wait
+ * until CSN is finally assigned. See also TransactionIdGetCSN().
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	CSN oldassignedCSN = InProgressCSN;
+	bool in_progress;
+
+	if (!get_csnlog_status())
+		return;
+
+	/* Set InDoubt status if it is local transaction */
+	in_progress = pg_atomic_compare_exchange_u64(&proc->assignedCSN,
+												 &oldassignedCSN,
+												 InDoubtCSN);
+	if (in_progress)
+	{
+		Assert(CSNIsInProgress(oldassignedCSN));
+		CSNLogSetCSN(xid, nsubxids, subxids, InDoubtCSN, true);
+	}
+	else
+	{
+		/* Otherwise we should have valid CSN by this time */
+		Assert(CSNIsNormal(oldassignedCSN));
+		Assert(CSNIsInDoubt(CSNLogGetCSNByXid(xid)));
+	}
+}
+
+/*
+ * CSNSnapshotCommit
+ *
+ * Write CSN that were acquired earlier to CsnLog. Should be
+ * preceded by CSNSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetCSN can wait on this
+ * lock for CSN.
+ */
+void
+CSNSnapshotCommit(PGPROC *proc, TransactionId xid,
+				  int nsubxids, TransactionId *subxids)
+{
+	volatile CSN assignedCSN;
+
+	if (!get_csnlog_status())
+		return;
+
+	if (!TransactionIdIsValid(xid))
+	{
+		assignedCSN = pg_atomic_read_u64(&proc->assignedCSN);
+		Assert(CSNIsInProgress(assignedCSN));
+		return;
+	}
+
+	/* Finally write resulting CSN in SLRU */
+	assignedCSN = pg_atomic_read_u64(&proc->assignedCSN);
+	Assert(CSNIsNormal(assignedCSN));
+	CSNLogSetCSN(xid, nsubxids, subxids, assignedCSN, true);
+
+	/* Reset for next transaction */
+	pg_atomic_write_u64(&proc->assignedCSN, InProgressCSN);
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b52..b86c172e46 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -10,6 +10,7 @@
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/generic_xlog.h"
 #include "access/ginxlog.h"
 #include "access/gistxlog.h"
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 28b153abc3..7bc6aae9a4 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/csn_snapshot.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
@@ -1536,8 +1537,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 									   hdr->nabortrels, abortrels,
 									   gid);
 
+	/*
+	 * CSNSnapshot callbacks that should be called right before we are
+	 * going to become visible. Details in comments to this functions.
+	 */
+	if (isCommit)
+		CSNSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+	else
+		CSNSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
 	ProcArrayRemove(proc, latestXid);
 
+	/*
+	 * Stamp our transaction with CSN in CSNLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks, since TransactionIdGetCSN relies on
+	 * XactLockTableWait to await csn.
+	 */
+	if (isCommit)
+	{
+		CSNSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+	}
+	else
+	{
+		Assert(CSNIsInProgress(
+				   pg_atomic_read_u64(&proc->assignedCSN)));
+	}
+
 	/*
 	 * In case we fail while running the callbacks, mark the gxact invalid so
 	 * no one else will try to commit/rollback, and so it will be recycled if
@@ -2583,3 +2610,130 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 	LWLockRelease(TwoPhaseStateLock);
 	return found;
 }
+
+/*
+ * CSNSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+static SnapshotCSN
+CSNSnapshotPrepareTwophase(const char *gid)
+{
+	GlobalTransaction	gxact;
+	PGPROC				*proc;
+	char				*buf;
+	TransactionId		xid;
+	xl_xact_parsed_prepare parsed;
+
+	if (!enable_csn_snapshot)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"enable_csn_snapshot")));
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to access the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	proc = &ProcGlobal->allProcs[gxact->pgprocno];
+	xid = proc->xid;
+
+	if (gxact->ondisk)
+		buf = ReadTwoPhaseFile(xid, true);
+	else
+		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+	ParsePrepareRecord(0, (xl_xact_prepare *)buf, &parsed);
+
+	CSNLogSetCSN(xid, parsed.nsubxacts,
+					parsed.subxacts, InDoubtCSN, true);
+
+	/* Unlock our GXACT */
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	gxact->locking_backend = InvalidBackendId;
+	LWLockRelease(TwoPhaseStateLock);
+
+	pfree(buf);
+	return GenerateCSN(false, InvalidCSN);
+}
+
+/*
+ * CSNSnapshotAssignTwoPhase
+ *
+ * Asign SnapshotCSN for currently active transaction. SnapshotCSN is supposedly
+ * maximal among of values returned by CSNSnapshotPrepareCurrent and
+ * pg_csn_snapshot_prepare.
+ *
+ * This function is a counterpart of CSNSnapshotAssignCurrent() for
+ * twophase transactions.
+ */
+static void
+CSNSnapshotAssignTwoPhase(const char *gid, SnapshotCSN csn)
+{
+	GlobalTransaction gxact;
+	PGPROC	   *proc;
+
+	if (!enable_csn_snapshot)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"enable_csn_snapshot")));
+
+	if (!CSNIsNormal(csn))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pg_csn_snapshot_assign expects normal snapshot_csn")));
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to access the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+	Assert(csn != InvalidCSN);
+	/* We do not care the Generate result, we just want to make sure max
+	 * csnShared->last_max_csn value.
+	 */
+	GenerateCSN(false, csn);
+	/* Set snapshot_csn and defuse ProcArrayRemove from assigning one. */
+	pg_atomic_write_u64(&proc->assignedCSN, csn);
+
+	/* Unlock our GXACT */
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	gxact->locking_backend = InvalidBackendId;
+	LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to CSNSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_csn_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+	const char 	*gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	SnapshotCSN	csn = CSNSnapshotPrepareTwophase(gid);
+
+	PG_RETURN_INT64(csn);
+}
+
+/*
+ * SQL interface to CSNSnapshotAssignTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'csn'
+ */
+Datum
+pg_csn_snapshot_assign(PG_FUNCTION_ARGS)
+{
+	const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	SnapshotCSN	csn = PG_GETARG_INT64(1);
+
+	CSNSnapshotAssignTwoPhase(gid, csn);
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index a6e98e71bd..8e1d074806 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -175,6 +176,7 @@ GetNewTransactionId(bool isSubXact)
 	 * Extend pg_subtrans and pg_commit_ts too.
 	 */
 	ExtendCLOG(xid);
+	ExtendCSNLog(xid);
 	ExtendCommitTs(xid);
 	ExtendSUBTRANS(xid);
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8e35c432f5..e6baf880d9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/csn_snapshot.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
@@ -1418,6 +1419,12 @@ RecordTransactionCommit(void)
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
 									   replorigin_session_origin_timestamp,
 									   replorigin_session_origin);
+
+		/*
+		 * Mark our transaction as InDoubt in CsnLog and get ready for
+		 * commit.
+		 */
+		CSNSnapshotPrecommit(MyProc, xid, nchildren, children);
 	}
 
 	/*
@@ -1772,6 +1779,9 @@ RecordTransactionAbort(bool isSubXact)
 	 */
 	TransactionIdAbortTree(xid, nchildren, children);
 
+	/* Mark our transaction as Aborted in CSN Log. */
+	CSNSnapshotAbort(MyProc, xid, nchildren, children);
+
 	END_CRIT_SECTION();
 
 	/* Compute latestXid while we have the child XIDs handy */
@@ -2114,6 +2124,13 @@ StartTransaction(void)
 	ShowTransactionState("StartTransaction");
 }
 
+Datum
+pg_current_csn(PG_FUNCTION_ARGS)
+{
+	SnapshotCSN	csn = GenerateCSN(false, InvalidCSN);
+
+	PG_RETURN_INT64(csn);
+}
 
 /*
  *	CommitTransaction
@@ -2262,6 +2279,21 @@ CommitTransaction(void)
 	 */
 	ProcArrayEndTransaction(MyProc, latestXid);
 
+	/*
+	 * Stamp our transaction with CSN in CsnLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks.
+	 */
+	if (!is_parallel_worker)
+	{
+		TransactionId xid = GetTopTransactionIdIfAny();
+		TransactionId *subxids;
+		int nsubxids;
+
+		nsubxids = xactGetCommittedChildren(&subxids);
+		CSNSnapshotCommit(MyProc, xid, nsubxids, subxids);
+	}
+
 	/*
 	 * This is all post-commit cleanup.  Note that if an error is raised here,
 	 * it's too late to abort the transaction.  This should be just
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1616448368..2a8de10038 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/heaptoast.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
@@ -4747,6 +4748,7 @@ InitControlFile(uint64 sysidentifier)
 	ControlFile->wal_level = wal_level;
 	ControlFile->wal_log_hints = wal_log_hints;
 	ControlFile->track_commit_timestamp = track_commit_timestamp;
+	ControlFile->enable_csn_snapshot = enable_csn_snapshot;
 	ControlFile->data_checksum_version = bootstrap_data_checksum_version;
 }
 
@@ -7181,6 +7183,9 @@ StartupXLOG(void)
 	if (ControlFile->track_commit_timestamp)
 		StartupCommitTs();
 
+	if(ControlFile->enable_csn_snapshot)
+		StartupCSN();
+
 	/*
 	 * Recover knowledge about replay progress of known replication partners.
 	 */
@@ -7448,6 +7453,8 @@ StartupXLOG(void)
 			 */
 			StartupSUBTRANS(oldestActiveXID);
 
+			CSNSnapshotStartup(oldestActiveXID);
+
 			/*
 			 * If we're beginning at a shutdown checkpoint, we know that
 			 * nothing was running on the primary at this point. So fake-up an
@@ -8117,7 +8124,10 @@ StartupXLOG(void)
 	 * timestamps are started below, if necessary.)
 	 */
 	if (standbyState == STANDBY_DISABLED)
+	{
 		StartupSUBTRANS(oldestActiveXID);
+		CSNSnapshotStartup(oldestActiveXID);
+	}
 
 	/*
 	 * Perform end of recovery actions for any SLRUs that need it.
@@ -8183,6 +8193,7 @@ StartupXLOG(void)
 	 * commit timestamp.
 	 */
 	CompleteCommitTsInitialization();
+	CompleteCSNInitialization();
 
 	/*
 	 * All done with end-of-recovery actions.
@@ -9616,6 +9627,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
 	CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
 	CheckPointCLOG();
+	CheckPointCSNLog();
 	CheckPointCommitTs();
 	CheckPointSUBTRANS();
 	CheckPointMultiXact();
@@ -9894,7 +9906,10 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
+	{
 		TruncateSUBTRANS(GetOldestTransactionIdConsideredRunning());
+		TruncateCSNLog(GetOldestTransactionIdConsideredRunning());
+	}
 
 	/* Real work is done; log and update stats. */
 	LogCheckpointEnd(true);
@@ -10172,7 +10187,8 @@ XLogReportParameters(void)
 		max_wal_senders != ControlFile->max_wal_senders ||
 		max_prepared_xacts != ControlFile->max_prepared_xacts ||
 		max_locks_per_xact != ControlFile->max_locks_per_xact ||
-		track_commit_timestamp != ControlFile->track_commit_timestamp)
+		track_commit_timestamp != ControlFile->track_commit_timestamp ||
+		enable_csn_snapshot != ControlFile->enable_csn_snapshot)
 	{
 		/*
 		 * The change in number of backend slots doesn't need to be WAL-logged
@@ -10194,6 +10210,7 @@ XLogReportParameters(void)
 			xlrec.wal_level = wal_level;
 			xlrec.wal_log_hints = wal_log_hints;
 			xlrec.track_commit_timestamp = track_commit_timestamp;
+			xlrec.enable_csn_snapshot = enable_csn_snapshot;
 
 			XLogBeginInsert();
 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
@@ -10212,6 +10229,7 @@ XLogReportParameters(void)
 		ControlFile->wal_level = wal_level;
 		ControlFile->wal_log_hints = wal_log_hints;
 		ControlFile->track_commit_timestamp = track_commit_timestamp;
+		ControlFile->enable_csn_snapshot = enable_csn_snapshot;
 		UpdateControlFile();
 
 		LWLockRelease(ControlFileLock);
@@ -10665,6 +10683,9 @@ xlog_redo(XLogReaderState *record)
 		CommitTsParameterChange(xlrec.track_commit_timestamp,
 								ControlFile->track_commit_timestamp);
 		ControlFile->track_commit_timestamp = xlrec.track_commit_timestamp;
+		CSNlogParameterChange(xlrec.enable_csn_snapshot,
+								ControlFile->enable_csn_snapshot);
+		ControlFile->enable_csn_snapshot = xlrec.enable_csn_snapshot;
 
 		UpdateControlFile();
 		LWLockRelease(ControlFileLock);
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 689384a411..e6585a94ba 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -73,6 +73,8 @@ typedef struct
 	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
+bool enable_csn_wal = true;
+
 static registered_buffer *registered_buffers;
 static int	max_registered_buffers; /* allocated size */
 static int	max_registered_block_id = 0;	/* highest block_id + 1 currently
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5c4bc15b44..e64ada86c7 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -53,7 +53,7 @@
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
-
+#include "access/csn_log.h"
 
 /*
  * GUC parameters
@@ -1760,6 +1760,7 @@ vac_truncate_clog(TransactionId frozenXID,
 	 */
 	TruncateCLOG(frozenXID, oldestxid_datoid);
 	TruncateCommitTs(frozenXID);
+	TruncateCSNLog(frozenXID);
 	TruncateMultiXact(minMulti, minmulti_datoid);
 
 	/*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index dbdc172a2b..214921dc9f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -120,6 +120,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/csn_log.h"
 #include "access/heapam_xlog.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -489,6 +490,9 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
 	snapshot->xmin = builder->xmin;
 	snapshot->xmax = builder->xmax;
 
+	snapshot->snapshot_csn = FrozenCSN;
+	snapshot->imported_csn = false;
+
 	/* store all transactions to be treated as committed by this snapshot */
 	snapshot->xip =
 		(TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 9fa3e0631e..2a7e184da9 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,8 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
+#include "access/csn_snapshot.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
@@ -120,6 +122,8 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, ProcGlobalShmemSize());
 	size = add_size(size, XLOGShmemSize());
 	size = add_size(size, CLOGShmemSize());
+	size = add_size(size, CSNLogShmemSize());
+	size = add_size(size, CSNSnapshotShmemSize());
 	size = add_size(size, CommitTsShmemSize());
 	size = add_size(size, SUBTRANSShmemSize());
 	size = add_size(size, TwoPhaseShmemSize());
@@ -242,6 +246,8 @@ CreateSharedMemoryAndSemaphores(void)
 	 */
 	XLOGShmemInit();
 	CLOGShmemInit();
+	CSNLogShmemInit();
+	CSNSnapshotShmemInit();
 	CommitTsShmemInit();
 	SUBTRANSShmemInit();
 	MultiXactShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 892f0f6799..5bc7370c73 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -48,6 +48,7 @@
 #include <signal.h>
 
 #include "access/clog.h"
+#include "access/csn_snapshot.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -96,6 +97,8 @@ typedef struct ProcArrayStruct
 	TransactionId replication_slot_xmin;
 	/* oldest catalog xmin of any replication slot */
 	TransactionId replication_slot_catalog_xmin;
+	/* xmin of oldest active csn snapshot */
+	TransactionId csn_snapshot_xmin;
 
 	/* indexes into allProcs[], has PROCARRAY_MAXPROCS entries */
 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -429,6 +432,7 @@ CreateSharedProcArray(void)
 		procArray->lastOverflowedXid = InvalidTransactionId;
 		procArray->replication_slot_xmin = InvalidTransactionId;
 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+		procArray->csn_snapshot_xmin = InvalidTransactionId;
 		ShmemVariableCache->xactCompletionCount = 1;
 	}
 
@@ -577,6 +581,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 		/* Advance global latestCompletedXid while holding the lock */
 		MaintainLatestCompletedXid(latestXid);
 
+		/*
+		 * Assign xid csn while holding ProcArrayLock for non-distributed
+		 * COMMIT PREPARED. After lock is released consequent
+		 * CSNSnapshotCommit() will write this value to CsnLog.
+		 */
+		if (CSNIsInDoubt(pg_atomic_read_u64(&proc->assignedCSN)))
+			pg_atomic_write_u64(&proc->assignedCSN, GenerateCSN(false, InvalidCSN));
+
 		/* Same with xactCompletionCount  */
 		ShmemVariableCache->xactCompletionCount++;
 
@@ -691,6 +703,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 		proc->xmin = InvalidTransactionId;
 		proc->delayChkpt = false;	/* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
+		proc->originalXmin = InvalidTransactionId;
 
 		/* must be cleared with xid/xmin: */
 		/* avoid unnecessarily dirtying shared cachelines */
@@ -730,6 +743,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
 	proc->xmin = InvalidTransactionId;
 	proc->delayChkpt = false;	/* be sure this is cleared in abort */
 	proc->recoveryConflictPending = false;
+	proc->originalXmin = InvalidTransactionId;
 
 	/* must be cleared with xid/xmin: */
 	/* avoid unnecessarily dirtying shared cachelines */
@@ -753,6 +767,16 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
 	/* Also advance global latestCompletedXid while holding the lock */
 	MaintainLatestCompletedXid(latestXid);
 
+	/*
+	 * Assign xid csn while holding ProcArrayLock for
+	 * COMMIT.
+	 *
+	 * TODO: in case of group commit we can generate one CSNSnapshot for
+	 * whole group to save time on timestamp aquisition.
+	 */
+	if (CSNIsInDoubt(pg_atomic_read_u64(&proc->assignedCSN)))
+		pg_atomic_write_u64(&proc->assignedCSN, GenerateCSN(false, InvalidCSN));
+
 	/* Same with xactCompletionCount  */
 	ShmemVariableCache->xactCompletionCount++;
 }
@@ -912,6 +936,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
 	proc->recoveryConflictPending = false;
+	proc->originalXmin = InvalidTransactionId;
 
 	Assert(!(proc->statusFlags & PROC_VACUUM_STATE_MASK));
 	Assert(!proc->delayChkpt);
@@ -1204,6 +1229,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
 	{
 		ExtendSUBTRANS(latestObservedXid);
+		ExtendCSNLog(latestObservedXid);
 		TransactionIdAdvance(latestObservedXid);
 	}
 	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
@@ -1704,6 +1730,7 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	TransactionId kaxmin;
 	bool		in_recovery = RecoveryInProgress();
 	TransactionId *other_xids = ProcGlobal->xids;
+	TransactionId csn_snapshot_xmin = InvalidTransactionId;
 
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
@@ -1843,6 +1870,10 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	if (in_recovery)
 		kaxmin = KnownAssignedXidsGetOldestXmin();
 
+	/* Get value of xmin, delayed by a CSN snapshot settings. */
+	if (get_csnlog_status() && csn_snapshot_defer_time > 0 && IsUnderPostmaster)
+		csn_snapshot_xmin = ProcArrayGetCSNSnapshotXmin();
+
 	/*
 	 * No other information from shared state is needed, release the lock
 	 * immediately. The rest of the computations can be done without a lock.
@@ -1899,6 +1930,15 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	h->data_oldest_nonremovable =
 		TransactionIdOlder(h->data_oldest_nonremovable, h->slot_xmin);
 
+	/*
+	 * Hold non-removable border because distributed transactions
+	 * can wish to see old data.
+	 */
+	h->shared_oldest_nonremovable =
+		TransactionIdOlder(h->shared_oldest_nonremovable, csn_snapshot_xmin);
+	h->data_oldest_nonremovable =
+		TransactionIdOlder(h->data_oldest_nonremovable, csn_snapshot_xmin);
+
 	/*
 	 * The only difference between catalog / data horizons is that the slot's
 	 * catalog xmin is applied to the catalog one (so catalogs can be accessed
@@ -2133,6 +2173,9 @@ GetSnapshotDataReuse(Snapshot snapshot)
 	if (curXactCompletionCount != snapshot->snapXactCompletionCount)
 		return false;
 
+	if (get_csnlog_status())
+		return false;
+
 	/*
 	 * If the current xactCompletionCount is still the same as it was at the
 	 * time the snapshot was built, we can be sure that rebuilding the
@@ -2212,6 +2255,8 @@ GetSnapshotData(Snapshot snapshot)
 	int			count = 0;
 	int			subcount = 0;
 	bool		suboverflowed = false;
+	CSN			csn = FrozenCSN;
+	TransactionId csn_snapshot_xmin = InvalidTransactionId;
 	FullTransactionId latest_completed;
 	TransactionId oldestxid;
 	int			mypgxactoff;
@@ -2444,6 +2489,20 @@ GetSnapshotData(Snapshot snapshot)
 	if (!TransactionIdIsValid(MyProc->xmin))
 		MyProc->xmin = TransactionXmin = xmin;
 
+	/* Take CSN under ProcArrayLock so the snapshot stays synchronized. */
+	if (!snapshot->takenDuringRecovery && get_csnlog_status())
+		csn = GenerateCSN(false, InvalidCSN);
+
+	if (get_csnlog_status() && csn_snapshot_defer_time > 0 && IsUnderPostmaster)
+	{
+		CSNSnapshotMapXmin(snapshot->snapshot_csn);
+
+		/* Get value of xmin, delayed by a CSN snapshot settings. */
+		csn_snapshot_xmin = ProcArrayGetCSNSnapshotXmin();
+		/* Adjust an oldest xid value with a xmin, delayed by CSN options. */
+		oldestxid = TransactionIdOlder(oldestxid, csn_snapshot_xmin);
+	}
+
 	LWLockRelease(ProcArrayLock);
 
 	/* maintain state for GlobalVis* */
@@ -2469,6 +2528,10 @@ GetSnapshotData(Snapshot snapshot)
 		def_vis_xid_data =
 			TransactionIdOlder(def_vis_xid_data, replication_slot_xmin);
 
+		/* The csn-related settings can require an older xmin. */
+		def_vis_xid_data =
+			TransactionIdOlder(def_vis_xid_data, csn_snapshot_xmin);
+
 		/*
 		 * Rows in non-shared, non-catalog tables possibly could be vacuumed
 		 * if older than this xid.
@@ -2549,6 +2612,8 @@ GetSnapshotData(Snapshot snapshot)
 	snapshot->active_count = 0;
 	snapshot->regd_count = 0;
 	snapshot->copied = false;
+	snapshot->imported_csn = false;
+	snapshot->snapshot_csn = csn;
 
 	GetSnapshotDataInitOldSnapshot(snapshot);
 
@@ -3901,6 +3966,25 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 	LWLockRelease(ProcArrayLock);
 }
 
+/*
+ * ProcArraySetCSNSnapshotXmin
+ */
+void
+ProcArraySetCSNSnapshotXmin(TransactionId xmin)
+{
+	/* We rely on atomic fetch/store of xid */
+	procArray->csn_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetCSNSnapshotXmin
+ */
+TransactionId
+ProcArrayGetCSNSnapshotXmin(void)
+{
+	return procArray->csn_snapshot_xmin;
+}
+
 /*
  * XidCacheRemoveRunningXids
  *
@@ -4383,6 +4467,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
 		while (TransactionIdPrecedes(next_expected_xid, xid))
 		{
 			TransactionIdAdvance(next_expected_xid);
+			ExtendCSNLog(next_expected_xid);
 			ExtendSUBTRANS(next_expected_xid);
 		}
 		Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 862097352b..1f78161d9a 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -135,6 +135,8 @@ static const char *const BuiltinTrancheNames[] = {
 	"CommitTSBuffer",
 	/* LWTRANCHE_SUBTRANS_BUFFER: */
 	"SubtransBuffer",
+	/* LWTRANCHE_CSN_LOG_BUFFERS */
+	"CSNLogBuffer",
 	/* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
 	"MultiXactOffsetBuffer",
 	/* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 6c7cf6c295..e8ca393611 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -53,3 +53,5 @@ XactTruncationLock					44
 # 45 was XactTruncationLock until removal of BackendRandomLock
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
+CSNLogSLRULock				    	48
+CSNSnapshotXidMapLock			    49
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b7d9da0aa9..88f4f42456 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -35,9 +35,11 @@
 #include <unistd.h>
 #include <sys/time.h>
 
+#include "access/csn_snapshot.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "access/xact.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -440,6 +442,9 @@ InitProcess(void)
 	MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
 	Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO);
 
+	MyProc->originalXmin = InvalidTransactionId;
+	pg_atomic_init_u64(&MyProc->assignedCSN, InProgressCSN);
+
 	/*
 	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
 	 * on it.  That allows us to repoint the process latch, which so far
@@ -585,6 +590,7 @@ InitAuxiliaryProcess(void)
 	MyProc->lwWaitMode = 0;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
+	MyProc->originalXmin = InvalidTransactionId;
 	pg_atomic_write_u64(&MyProc->waitStart, 0);
 #ifdef USE_ASSERT_CHECKING
 	{
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index d4083e8a56..383f1e4566 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -20,6 +20,7 @@
 
 #include "access/commit_ts.h"
 #include "access/clog.h"
+#include "access/csn_log.h"
 #include "access/multixact.h"
 #include "access/xlog.h"
 #include "access/xlogutils.h"
@@ -119,6 +120,10 @@ static const SyncOps syncsw[] = {
 	/* pg_multixact/members */
 	[SYNC_HANDLER_MULTIXACT_MEMBER] = {
 		.sync_syncfiletag = multixactmemberssyncfiletag
+	},
+	/* pg_multixact/members */
+	[SYNC_HANDLER_CSN] = {
+		.sync_syncfiletag = csnsyncfiletag
 	}
 };
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e91d5a3cfd..4d9833fb5f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -34,6 +34,7 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/csn_snapshot.h"
 #include "access/rmgr.h"
 #include "access/tableam.h"
 #include "access/toast_compression.h"
@@ -1212,6 +1213,24 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_csn_snapshot", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Enable csn-base snapshot."),
+			gettext_noop("Used to achieve REPEATABLE READ isolation level for postgres_fdw transactions.")
+		},
+		&enable_csn_snapshot,
+		true,
+		NULL, NULL, NULL
+	},
+	{
+		{"enable_csn_wal", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Enable csn-wal record."),
+			gettext_noop("Used to enable csn-wal record")
+		},
+		&enable_csn_wal,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
 			gettext_noop("Enables SSL connections."),
@@ -3195,6 +3214,24 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"csn_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_PRIMARY,
+			gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+			NULL
+		},
+		&csn_snapshot_defer_time,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+	{
+		{"csn_time_shift", PGC_USERSET, RESOURCES_MEM,
+			gettext_noop("Do the time shift in the CSN generator."),
+			gettext_noop("Used for debug purposes.")
+		},
+		&csn_time_shift,
+		0, INT_MIN, INT_MAX,
+		NULL, NULL, NULL
+	},
 	{
 		{"block_size", PGC_INTERNAL, PRESET_OPTIONS,
 			gettext_noop("Shows the size of a disk block."),
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index b0c50a3c7f..3fcd0f4ccf 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
 	probe clog__checkpoint__done(bool);
 	probe subtrans__checkpoint__start(bool);
 	probe subtrans__checkpoint__done(bool);
+	probe csnlog__checkpoint__start(bool);
+	probe csnlog__checkpoint__done(bool);
 	probe multixact__checkpoint__start(bool);
 	probe multixact__checkpoint__done(bool);
 	probe twophase__checkpoint__start();
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 5001efdf7a..5a52fce2ed 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -77,6 +78,8 @@
  */
 int			old_snapshot_threshold; /* number of minutes, -1 disables */
 
+bool		enable_csn_snapshot;
+
 volatile OldSnapshotControlData *oldSnapshotControl;
 
 
@@ -173,6 +176,7 @@ static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
 static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
+static bool XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
 /*
  * Snapshot fields to be serialized.
@@ -191,6 +195,8 @@ typedef struct SerializedSnapshotData
 	CommandId	curcid;
 	TimestampTz whenTaken;
 	XLogRecPtr	lsn;
+	CSN			csn;
+	bool		imported_csn;
 } SerializedSnapshotData;
 
 Size
@@ -544,6 +550,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
 		   sourcesnap->subxcnt * sizeof(TransactionId));
 	CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
 	CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
+	CurrentSnapshot->snapshot_csn = sourcesnap->snapshot_csn;
+	CurrentSnapshot->imported_csn = sourcesnap->imported_csn;
 	/* NB: curcid should NOT be copied, it's a local matter */
 
 	CurrentSnapshot->snapXactCompletionCount = 0;
@@ -1209,6 +1217,10 @@ ExportSnapshot(Snapshot snapshot)
 	appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
 	appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
 
+	appendStringInfo(&buf, "snapshot_csn:"UINT64_FORMAT"\n",
+					 snapshot->snapshot_csn);
+	appendStringInfo(&buf, "imported_csn:%u\n", snapshot->imported_csn);
+
 	/*
 	 * We must include our own top transaction ID in the top-xid data, since
 	 * by definition we will still be running when the importing transaction
@@ -1333,6 +1345,31 @@ parseIntFromText(const char *prefix, char **s, const char *filename)
 	return val;
 }
 
+static CSN
+parseCSNFromText(const char *prefix, char **s, const char *filename)
+{
+	char	   *ptr = *s;
+	int			prefixlen = strlen(prefix);
+	uint64		val;
+
+	if (strncmp(ptr, prefix, prefixlen) != 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+				 errmsg("invalid snapshot data in file \"%s\"", filename)));
+	ptr += prefixlen;
+	if (sscanf(ptr, UINT64_FORMAT, &val) != 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+				 errmsg("invalid snapshot data in file \"%s\"", filename)));
+	ptr = strchr(ptr, '\n');
+	if (!ptr)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+				 errmsg("invalid snapshot data in file \"%s\"", filename)));
+	*s = ptr + 1;
+	return val;
+}
+
 static TransactionId
 parseXidFromText(const char *prefix, char **s, const char *filename)
 {
@@ -1474,6 +1511,9 @@ ImportSnapshot(const char *idstr)
 	snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
 	snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
 
+	snapshot.snapshot_csn = parseCSNFromText("snapshot_csn:", &filebuf, path);
+	snapshot.imported_csn = parseIntFromText("imported_csn:", &filebuf, path);
+
 	snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
 
 	/* sanity-check the xid count before palloc */
@@ -2130,6 +2170,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
 	serialized_snapshot.curcid = snapshot->curcid;
 	serialized_snapshot.whenTaken = snapshot->whenTaken;
 	serialized_snapshot.lsn = snapshot->lsn;
+	serialized_snapshot.csn = snapshot->snapshot_csn;
+	serialized_snapshot.imported_csn = snapshot->imported_csn;
 
 	/*
 	 * Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2204,6 +2246,8 @@ RestoreSnapshot(char *start_address)
 	snapshot->curcid = serialized_snapshot.curcid;
 	snapshot->whenTaken = serialized_snapshot.whenTaken;
 	snapshot->lsn = serialized_snapshot.lsn;
+	snapshot->snapshot_csn = serialized_snapshot.csn;
+	snapshot->imported_csn = serialized_snapshot.imported_csn;
 	snapshot->snapXactCompletionCount = 0;
 
 	/* Copy XIDs, if present. */
@@ -2245,6 +2289,44 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
 
 /*
  * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot. When enable_csn_snapshot is
+ * switched off just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	bool in_snapshot;
+
+	if (snapshot->imported_csn)
+	{
+		Assert(enable_csn_snapshot);
+		/* No point to using snapshot info except CSN */
+		return XidInCSNSnapshot(xid, snapshot);
+	}
+
+	in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+	if (!get_csnlog_status())
+	{
+		Assert(CSNIsFrozen(snapshot->snapshot_csn));
+		return in_snapshot;
+	}
+
+	if (in_snapshot)
+	{
+		/*
+		 * This xid may be already in unknown state and in that case
+		 * we must wait and recheck.
+		 */
+		return XidInCSNSnapshot(xid, snapshot);
+	}
+	else
+		return false;
+}
+
+/*
+ * XidInLocalMVCCSnapshot
  *		Is the given XID still-in-progress according to the snapshot?
  *
  * Note: GetSnapshotData never stores either top xid or subxids of our own
@@ -2253,8 +2335,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
  * TransactionIdIsCurrentTransactionId first, except when it's known the
  * XID could not be ours anyway.
  */
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
 	uint32		i;
 
@@ -2364,3 +2446,100 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 
 	return false;
 }
+
+
+/*
+ * ExportCSNSnapshot
+ *
+ * Export snapshot_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+SnapshotCSN
+ExportCSNSnapshot()
+{
+	if (!get_csnlog_status())
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not export csn snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is enabled.",
+					 "enable_csn_snapshot")));
+
+	elog(DEBUG5, "Export CSN Snapshot: csn = %lu",
+		 CurrentSnapshot->snapshot_csn);
+	return CurrentSnapshot->snapshot_csn;
+}
+
+/* SQL accessor to ExportCSNSnapshot() */
+Datum
+pg_csn_snapshot_export(PG_FUNCTION_ARGS)
+{
+	SnapshotCSN csn = ExportCSNSnapshot();
+
+	PG_RETURN_UINT64(csn);
+}
+
+/*
+ * ImportCSNSnapshot
+ *
+ * Import csn and retract this backends xmin to the value that was
+ * actual when we had such csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportCSNSnapshot(SnapshotCSN snapshot_csn)
+{
+	volatile TransactionId xmin;
+
+	if (!get_csnlog_status())
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not import csn snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is enabled.",
+					 "enable_csn_snapshot")));
+
+	if (csn_snapshot_defer_time <= 0)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not import csn snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is positive.",
+					 "csn_snapshot_defer_time")));
+
+	/*
+	 * Call CSNSnapshotToXmin under ProcArrayLock to avoid situation that
+	 * resulting xmin will be evicted from map before we will set it into our
+	 * backend's xmin.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	xmin = CSNSnapshotToXmin(snapshot_csn);
+	if (!TransactionIdIsValid(xmin))
+	{
+		LWLockRelease(ProcArrayLock);
+		elog(ERROR, "CSNSnapshotToXmin: csn snapshot too old");
+	}
+
+	MyProc->originalXmin = MyProc->xmin;
+	MyProc->xmin = TransactionXmin = xmin;
+	LWLockRelease(ProcArrayLock);
+
+	CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+	CurrentSnapshot->snapshot_csn = snapshot_csn;
+	CurrentSnapshot->imported_csn = true;
+	CSNSnapshotSync(snapshot_csn);
+}
+
+/* SQL accessor to ImportCSNSnapshot() */
+Datum
+pg_csn_snapshot_import(PG_FUNCTION_ARGS)
+{
+	SnapshotCSN csn = PG_GETARG_UINT64(0);
+
+	ImportCSNSnapshot(csn);
+	PG_RETURN_VOID();
+}
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 31839c1a19..1864952bd2 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -227,7 +227,8 @@ static const char *const subdirs[] = {
 	"pg_xact",
 	"pg_logical",
 	"pg_logical/snapshots",
-	"pg_logical/mappings"
+	"pg_logical/mappings",
+	"pg_csn"
 };
 
 
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index f911f98d94..325e6a0e2b 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -300,6 +300,8 @@ main(int argc, char *argv[])
 		   ControlFile->max_locks_per_xact);
 	printf(_("track_commit_timestamp setting:       %s\n"),
 		   ControlFile->track_commit_timestamp ? _("on") : _("off"));
+	printf(_("enable_csn_snapshot setting:    	    %s\n"),
+		   ControlFile->enable_csn_snapshot ? 	 _("on") : _("off"));
 	printf(_("Maximum data alignment:               %u\n"),
 		   ControlFile->maxAlign);
 	/* we don't print floatFormat since can't say much useful about it */
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 3628bd74a7..18cf9197cc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -548,6 +548,11 @@ copy_xact_xlog_xid(void)
 		check_ok();
 	}
 
+	if(old_cluster.controldata.cat_ver > CSN_BASE_SNAPSHOT_ADD_VER)
+	{
+		copy_subdir_files("pg_csn", "pg_csn");
+	}
+
 	/* now reset the wal archives in the new cluster */
 	prep_status("Resetting WAL archives");
 	exec_prog(UTILITY_LOG_FILE, NULL, true, true,
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index ca0795f68f..54f2984387 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -124,6 +124,8 @@ extern char *output_files[];
  */
 #define JSONB_FORMAT_CHANGE_CAT_VER 201409291
 
+#define	CSN_BASE_SNAPSHOT_ADD_VER	202002010
+
 
 /*
  * Each relation is represented by a relinfo structure.
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1..2d280ce940 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -11,6 +11,7 @@
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/generic_xlog.h"
 #include "access/ginxlog.h"
 #include "access/gistxlog.h"
diff --git a/src/include/access/csn_log.h b/src/include/access/csn_log.h
new file mode 100644
index 0000000000..12df028bf4
--- /dev/null
+++ b/src/include/access/csn_log.h
@@ -0,0 +1,98 @@
+/*
+ * csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+#include "storage/sync.h"
+
+
+#define InProgressCSN	 	UINT64CONST(0x0)
+#define AbortedCSN	 		UINT64CONST(0x1)
+#define FrozenCSN		 	UINT64CONST(0x2)
+#define InDoubtCSN	 		UINT64CONST(0x3)
+#define UnclearCSN	 		UINT64CONST(0x4)
+#define FirstNormalCSN 		UINT64CONST(0x5)
+
+#define CSNIsInProgress(csn)	((csn) == InProgressCSN)
+#define CSNIsAborted(csn)		((csn) == AbortedCSN)
+#define CSNIsFrozen(csn)		((csn) == FrozenCSN)
+#define CSNIsInDoubt(csn)		((csn) == InDoubtCSN)
+#define CSNIsUnclear(csn)		((csn) == UnclearCSN)
+#define CSNIsNormal(csn)		((csn) >= FirstNormalCSN)
+
+/* XLOG stuff */
+#define XLOG_CSN_ASSIGNMENT			0x00
+#define XLOG_CSN_SETCSN				0x10
+#define XLOG_CSN_ZEROPAGE			0x20
+#define XLOG_CSN_TRUNCATE			0x30
+
+/*
+ * We should log MAX generated CSN to wal, so that database will not generate
+ * a historical CSN after database restart. This may appear when system time
+ * turned back.
+ *
+ * However we can not log the MAX CSN every time it generated, if so it will
+ * cause too many wal expend, so we log it 5s more in the future.
+ *
+ * As a trade off, when this database restart, there will be 5s bad performance
+ * for time synchronization among sharding nodes.
+ *
+ * It looks like we can redefine this as a configure parameter, and the user
+ * can decide which way they prefer.
+ *
+ */
+#define	CSN_ASSIGN_TIME_INTERVAL	5
+
+typedef struct xl_csn_set
+{
+	CSN				csn;
+	TransactionId	xtop;			/* XID's top-level XID */
+	int				nsubxacts;		/* number of subtransaction XIDs */
+	TransactionId	xsub[FLEXIBLE_ARRAY_MEMBER];	/* assigned subxids */
+} xl_csn_set;
+
+#define MinSizeOfCSNSet offsetof(xl_csn_set, xsub)
+#define	CSNAddByNanosec(csn,second) (csn + second * 1000000000L)
+
+/* Main functions */
+extern void CSNLogSetCSN(TransactionId xid, int nsubxids,
+							   TransactionId *subxids, CSN csn, bool write_xlog);
+extern CSN CSNLogGetCSNByXid(TransactionId xid);
+
+/* Infrastructure functions */
+extern Size CSNLogShmemSize(void);
+extern void CSNLogShmemInit(void);
+extern void ActivateCSNlog(void);
+extern void ExtendCSNLog(TransactionId newestXact);
+extern void DeactivateCSNlog(void);
+
+extern void CheckPointCSNLog(void);
+extern void TruncateCSNLog(TransactionId oldestXact);
+
+extern void csnlog_redo(XLogReaderState *record);
+extern void csnlog_desc(StringInfo buf, XLogReaderState *record);
+extern const char *csnlog_identify(uint8 info);
+extern void WriteAssignCSNXlogRec(CSN csn);
+extern void CatchCSNLog(void);
+extern void StartupCSN(void);
+extern void CompleteCSNInitialization(void);
+extern void CSNlogParameterChange(bool newvalue, bool oldvalue);
+extern bool get_csnlog_status(void);
+extern int csnsyncfiletag(const FileTag *ftag, char *path);
+
+extern CSN GenerateCSN(bool locked, CSN assign);
+extern CSN GetLastGeneratedCSN(void);
+
+extern TransactionId GetOldestXmin(void);
+
+#endif   /* CSNLOG_H */
\ No newline at end of file
diff --git a/src/include/access/csn_snapshot.h b/src/include/access/csn_snapshot.h
new file mode 100644
index 0000000000..916603af0c
--- /dev/null
+++ b/src/include/access/csn_snapshot.h
@@ -0,0 +1,54 @@
+/*-------------------------------------------------------------------------
+ *
+ * csn_snapshot.h
+ *	  Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/csn_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CSN_SNAPSHOT_H
+#define CSN_SNAPSHOT_H
+
+#include "access/csn_log.h"
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of SnapshotCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 CSN_atomic;
+
+
+extern int csn_snapshot_defer_time;
+extern int csn_time_shift;
+
+
+extern Size CSNSnapshotShmemSize(void);
+extern void CSNSnapshotShmemInit(void);
+extern void CSNSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void CSNSnapshotMapXmin(SnapshotCSN snapshot_csn);
+extern TransactionId CSNSnapshotToXmin(SnapshotCSN snapshot_csn);
+
+extern bool XidInCSNSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern CSN TransactionIdGetCSN(TransactionId xid);
+
+extern void CSNSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+								TransactionId *subxids);
+extern void CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+extern void CSNSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+extern void CSNSnapshotAssignCurrent(SnapshotCSN snapshot_csn);
+extern SnapshotCSN CSNSnapshotPrepareCurrent(void);
+extern void CSNSnapshotSync(SnapshotCSN remote_csn);
+
+#endif							/* CSN_SNAPSHOT_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index f582cf535f..3cf0775176 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_CSNLOG_ID, "CSN", csnlog_redo, csnlog_desc, csnlog_identify, NULL, NULL, NULL)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index c0da76cab4..2ee489dcad 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -242,6 +242,7 @@ typedef struct xl_parameter_change
 	int			wal_level;
 	bool		wal_log_hints;
 	bool		track_commit_timestamp;
+	bool		enable_csn_snapshot;
 } xl_parameter_change;
 
 /* logs restore point */
@@ -332,5 +333,6 @@ extern bool ArchiveRecoveryRequested;
 extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
+extern bool enable_csn_wal;
 
 #endif							/* XLOG_INTERNAL_H */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 749bce0cc6..a7da532f3a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -183,6 +183,7 @@ typedef struct ControlFileData
 	int			max_prepared_xacts;
 	int			max_locks_per_xact;
 	bool		track_commit_timestamp;
+	bool		enable_csn_snapshot;
 
 	/*
 	 * This data is used to check for hardware-architecture compatibility of
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..d578aceb40 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11689,4 +11689,21 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+# csn shnapshot handling
+{ oid => '10001', descr => 'export csn snapshot',
+  proname => 'pg_csn_snapshot_export', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => '', prosrc => 'pg_csn_snapshot_export' },
+{ oid => '10002', descr => 'import csn snapshot',
+  proname => 'pg_csn_snapshot_import', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_csn_snapshot_import' },
+{ oid => '10003', descr => 'prepare distributed transaction for commit, get csn',
+  proname => 'pg_csn_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_csn_snapshot_prepare' },
+{ oid => '10004', descr => 'assign csn to distributed transaction',
+  proname => 'pg_csn_snapshot_assign', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_csn_snapshot_assign' },
+{ oid => '10005', descr => 'get current CSN',
+  proname => 'pg_current_csn', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => '', prosrc => 'pg_current_csn' },
+
 ]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 99873497a6..8d1ced7430 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
 #define USECS_PER_MINUTE INT64CONST(60000000)
 #define USECS_PER_SEC	INT64CONST(1000000)
 
+#define NSECS_PER_SEC	INT64CONST(1000000000)
+#define NSECS_PER_USEC	INT64CONST(1000)
+
 /*
  * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
  * Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index ab7b85c86e..f08999740b 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -281,6 +281,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
 #define PG_GETARG_FLOAT4(n)  DatumGetFloat4(PG_GETARG_DATUM(n))
 #define PG_GETARG_FLOAT8(n)  DatumGetFloat8(PG_GETARG_DATUM(n))
 #define PG_GETARG_INT64(n)	 DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n)	 DatumGetUInt64(PG_GETARG_DATUM(n))
 /* use this if you want the raw, possibly-toasted input datum: */
 #define PG_GETARG_RAW_VARLENA_P(n)	((struct varlena *) PG_GETARG_POINTER(n))
 /* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index 39a4f0600e..a78f0d284b 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -141,6 +141,9 @@ typedef struct timespec instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
 #else							/* !HAVE_CLOCK_GETTIME */
 
 /* Use gettimeofday() */
@@ -205,6 +208,10 @@ typedef struct timeval instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + \
+		(uint64) (t).tv_usec * (uint64) 1000)
+
 #endif							/* HAVE_CLOCK_GETTIME */
 
 #else							/* WIN32 */
@@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
 static inline double
 GetTimerFrequency(void)
 {
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index a8f052e484..65d1e49fb2 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -168,6 +168,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS,
 	LWTRANCHE_COMMITTS_BUFFER,
 	LWTRANCHE_SUBTRANS_BUFFER,
+	LWTRANCHE_CSN_LOG_BUFFERS,
 	LWTRANCHE_MULTIXACTOFFSET_BUFFER,
 	LWTRANCHE_MULTIXACTMEMBER_BUFFER,
 	LWTRANCHE_NOTIFY_BUFFER,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index be67d8a861..ade5d8e169 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,12 +15,14 @@
 #define _PROC_H_
 
 #include "access/clog.h"
+#include "access/csn_snapshot.h"
 #include "access/xlogdefs.h"
 #include "lib/ilist.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
 #include "storage/proclist_types.h"
+#include "utils/snapshot.h"
 
 /*
  * Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
@@ -251,6 +253,18 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	/*
+	 * assignedCSN holds CSN for this transaction.  It is generated
+	 * under a ProcArray lock and later is written to a CSNLog.  This
+	 * variable defined as atomic only for case of group commit, in all other
+	 * scenarios only backend responsible for this proc entry is working with
+	 * this variable.
+	 */
+	CSN_atomic assignedCSN;
+
+	/* Original xmin of this backend before csn snapshot was imported */
+	TransactionId originalXmin;
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index b01fa52139..ba580435f9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -20,6 +20,10 @@
 #include "utils/snapshot.h"
 
 
+#define		PROCARRAY_NON_IMPORTED_XMIN		0x80	/* use originalXmin instead
+													 * of xmin to properly
+													 * maintain csnXidMap */
+
 extern Size ProcArrayShmemSize(void);
 extern void CreateSharedProcArray(void);
 extern void ProcArrayAdd(PGPROC *proc);
@@ -94,4 +98,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+extern void ProcArraySetCSNSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetCSNSnapshotXmin(void);
 #endif							/* PROCARRAY_H */
diff --git a/src/include/storage/sync.h b/src/include/storage/sync.h
index 6fd50cfa7b..eb1d52673a 100644
--- a/src/include/storage/sync.h
+++ b/src/include/storage/sync.h
@@ -39,6 +39,7 @@ typedef enum SyncRequestHandler
 	SYNC_HANDLER_COMMIT_TS,
 	SYNC_HANDLER_MULTIXACT_OFFSET,
 	SYNC_HANDLER_MULTIXACT_MEMBER,
+	SYNC_HANDLER_CSN,
 	SYNC_HANDLER_NONE
 } SyncRequestHandler;
 
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index c6a176cc95..122eea20ba 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -41,10 +41,11 @@
   && !RelationIsAccessibleInLogicalDecoding(rel) \
 )
 
-#define EarlyPruningEnabled(rel) (old_snapshot_threshold >= 0 && RelationAllowsEarlyPruning(rel))
+#define EarlyPruningEnabled(rel) (old_snapshot_threshold >= 0 && !enable_csn_snapshot && RelationAllowsEarlyPruning(rel))
 
 /* GUC variables */
 extern PGDLLIMPORT int old_snapshot_threshold;
+extern PGDLLIMPORT bool enable_csn_snapshot;
 
 
 extern Size SnapMgrShmemSize(void);
@@ -100,7 +101,7 @@ extern PGDLLIMPORT SnapshotData CatalogSnapshotData;
 static inline bool
 OldSnapshotThresholdActive(void)
 {
-	return old_snapshot_threshold >= 0;
+	return (old_snapshot_threshold >= 0) && (!enable_csn_snapshot);
 }
 
 extern Snapshot GetTransactionSnapshot(void);
@@ -130,6 +131,8 @@ extern void AtSubCommit_Snapshot(int level);
 extern void AtSubAbort_Snapshot(int level);
 extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
 
+extern SnapshotCSN ExportCSNSnapshot(void);
+extern void ImportCSNSnapshot(SnapshotCSN snapshot_csn);
 extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 6b60755c53..3580a94c43 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -121,6 +121,9 @@ typedef enum SnapshotType
 typedef struct SnapshotData *Snapshot;
 
 #define InvalidSnapshot		((Snapshot) NULL)
+#define InvalidCSN			((CSN) 0)
+typedef uint64 CSN;
+typedef uint64 SnapshotCSN;
 
 /*
  * Struct representing all kind of possible snapshots.
@@ -214,6 +217,14 @@ typedef struct SnapshotData
 	 * transactions completed since the last GetSnapshotData().
 	 */
 	uint64		snapXactCompletionCount;
+
+	/*
+	 * SnapshotCSN for snapshot isolation support.
+	 * Will be used only if enable_csn_snapshot is enabled.
+	 */
+	SnapshotCSN	snapshot_csn;
+	/* Did we have our own snapshot_csn or imported one from different node */
+	bool		imported_csn;
 } SnapshotData;
 
 #endif							/* SNAPSHOT_H */
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index dffc79b2d9..16bb65e7e1 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global
 SUBDIRS = \
 		  brin \
 		  commit_ts \
+		  csnsnapshot \
 		  delay_execution \
 		  dummy_index_am \
 		  dummy_seclabel \
diff --git a/src/test/modules/csnsnapshot/Makefile b/src/test/modules/csnsnapshot/Makefile
new file mode 100644
index 0000000000..15a07f8846
--- /dev/null
+++ b/src/test/modules/csnsnapshot/Makefile
@@ -0,0 +1,22 @@
+# src/test/modules/csnsnapshot/Makefile
+
+NO_INSTALLCHECK = 1
+
+TAP_TESTS = 1
+
+# Doesn't support full consistency of distributed commit in READ COMMITTED
+# transactions.
+PROVE_TESTS =	t/001_base.pl \
+				t/002_standby.pl \
+				t/003_parallel_safe.pl
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/csnsnapshot
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/csnsnapshot/expected/csnsnapshot.out b/src/test/modules/csnsnapshot/expected/csnsnapshot.out
new file mode 100644
index 0000000000..ac28e417b6
--- /dev/null
+++ b/src/test/modules/csnsnapshot/expected/csnsnapshot.out
@@ -0,0 +1 @@
+create table t1(i int, j int, k varchar);
diff --git a/src/test/modules/csnsnapshot/t/001_base.pl b/src/test/modules/csnsnapshot/t/001_base.pl
new file mode 100644
index 0000000000..b81419512e
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/001_base.pl
@@ -0,0 +1,100 @@
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 5;
+
+my ($node, $test_snapshot, $count1, $count2);
+$node = PostgreSQL::Test::Cluster->new('csntest');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+					enable_csn_snapshot = on
+					csn_snapshot_defer_time = 10
+					max_prepared_transactions = 10
+					});
+$node->start;
+
+# Create a table
+$node->safe_psql('postgres', 'create table t1(i int, j int)');
+
+# insert test record
+$node->safe_psql('postgres', 'insert into t1 values(1,1)');
+# export csn snapshot
+$test_snapshot = $node->safe_psql('postgres', 'select pg_csn_snapshot_export()');
+# insert test record
+$node->safe_psql('postgres', 'insert into t1 values(2,1)');
+
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '2', 'Get right number in normal query');
+$count2 = $node->safe_psql('postgres', "
+			begin transaction isolation level repeatable read;
+			select pg_csn_snapshot_import($test_snapshot);
+			select count(*) from t1;
+			commit;"
+			);
+
+is($count2, '
+1', 'Get right number in csn import query');
+
+#prepare transaction test
+$node->safe_psql('postgres', "
+						begin;
+						insert into t1 values(3,1);
+						insert into t1 values(3,2);
+						prepare	transaction 'pt3';
+						");
+$node->safe_psql('postgres', "
+						begin;
+						insert into t1 values(4,1);
+						insert into t1 values(4,2);
+						prepare	transaction 'pt4';
+						");
+$node->safe_psql('postgres', "
+						begin;
+						insert into t1 values(5,1);
+						insert into t1 values(5,2);
+						prepare	transaction 'pt5';
+						");
+$node->safe_psql('postgres', "
+						begin;
+						insert into t1 values(6,1);
+						insert into t1 values(6,2);
+						prepare	transaction 'pt6';
+						");
+$node->safe_psql('postgres', "commit prepared 'pt4';");
+
+# restart with enable_csn_snapshot off
+$node->append_conf('postgresql.conf', "enable_csn_snapshot = off");
+$node->restart;
+$node->safe_psql('postgres', "
+						insert into t1 values(7,1);
+						insert into t1 values(7,2);
+						");
+$node->safe_psql('postgres', "commit prepared 'pt3';");
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '8', 'Get right number in normal query');
+
+
+# restart with enable_csn_snapshot on
+$node->append_conf('postgresql.conf', "enable_csn_snapshot = on");
+$node->restart;
+$node->safe_psql('postgres', "
+						insert into t1 values(8,1);
+						insert into t1 values(8,2);
+						");
+$node->safe_psql('postgres', "commit prepared 'pt5';");
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '12', 'Get right number in normal query');
+
+# restart with enable_csn_snapshot off
+$node->append_conf('postgresql.conf', "enable_csn_snapshot = on");
+$node->restart;
+$node->safe_psql('postgres', "
+						insert into t1 values(9,1);
+						insert into t1 values(9,2);
+						");
+$node->safe_psql('postgres', "commit prepared 'pt6';");
+
+$count1 = $node->safe_psql('postgres', "select count(*) from t1");
+is($count1, '16', 'Get right number in normal query');
diff --git a/src/test/modules/csnsnapshot/t/002_standby.pl b/src/test/modules/csnsnapshot/t/002_standby.pl
new file mode 100644
index 0000000000..27fcbb8f8a
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/002_standby.pl
@@ -0,0 +1,68 @@
+# Test simple scenario involving a standby
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 6;
+
+my ($master, $bkplabel, $standby, $guc_on_master, $guc_on_standby);
+
+$bkplabel = 'backup';
+$master = PostgreSQL::Test::Cluster->new('master');
+$master->init(allows_streaming => 1);
+
+$master->append_conf(
+	'postgresql.conf', qq{
+	enable_csn_snapshot = on
+	max_wal_senders = 5
+	});
+$master->start;
+$master->backup($bkplabel);
+
+$standby = PostgreSQL::Test::Cluster->new('standby');
+$standby->init_from_backup($master, $bkplabel, has_streaming => 1);
+$standby->start;
+
+$master->safe_psql('postgres', "create table t1(i int, j int)");
+
+$guc_on_master = $master->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_master, 'on', "GUC on master");
+
+$guc_on_standby = $standby->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_standby, 'on', "GUC on standby");
+
+$master->append_conf('postgresql.conf', 'enable_csn_snapshot = off');
+$master->restart;
+
+$guc_on_master = $master->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_master, 'off', "GUC off master");
+
+$guc_on_standby = $standby->safe_psql('postgres', 'show enable_csn_snapshot');
+is($guc_on_standby, 'on', "GUC on standby");
+
+# We consume a large number of transaction,for skip page
+for my $i (1 .. 4096) #4096
+{
+	$master->safe_psql('postgres', "insert into t1 values(1,$i)");
+}
+$master->safe_psql('postgres', "select pg_sleep(2)");
+$master->append_conf('postgresql.conf', 'enable_csn_snapshot = on');
+$master->restart;
+
+my $count_standby = $standby->safe_psql('postgres', 'select count(*) from t1');
+is($count_standby, '4096', "Ok for siwtch xid-base > csn-base"); #4096
+
+# We consume a large number of transaction,for skip page
+for my $i (1 .. 4096) #4096
+{
+	$master->safe_psql('postgres', "insert into t1 values(1,$i)");
+}
+$master->safe_psql('postgres', "select pg_sleep(2)");
+
+$master->append_conf('postgresql.conf', 'enable_csn_snapshot = off');
+$master->restart;
+
+$count_standby = $standby->safe_psql('postgres', 'select count(*) from t1');
+is($count_standby, '8192', "Ok for switch csn-base > xid-base"); #8192
\ No newline at end of file
diff --git a/src/test/modules/csnsnapshot/t/003_parallel_safe.pl b/src/test/modules/csnsnapshot/t/003_parallel_safe.pl
new file mode 100644
index 0000000000..e303e3f1a6
--- /dev/null
+++ b/src/test/modules/csnsnapshot/t/003_parallel_safe.pl
@@ -0,0 +1,67 @@
+# Check safety of CSN machinery for parallel mode.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 2;
+
+my ($node, $updScr, $selScr, $started, $pgb_handle1, $result, $errors);
+
+$node = PostgreSQL::Test::Cluster->new('csntest');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+					enable_csn_snapshot = on
+					csn_snapshot_defer_time = 10
+					default_transaction_isolation = 'REPEATABLE READ'
+
+					# force parallel mode.
+					max_worker_processes = 64
+					max_parallel_workers_per_gather = 16
+					max_parallel_workers = 32
+					parallel_setup_cost = 1
+					parallel_tuple_cost = 0.05
+					min_parallel_table_scan_size = 0
+});
+$node->start;
+
+$node->command_ok([ 'pgbench', '-i', '-s', '1' ], "pgbench initialization ok");
+$node->safe_psql('postgres', qq{
+	CREATE OR REPLACE FUNCTION cnt() RETURNS integer AS '
+		SELECT sum(abalance) FROM pgbench_accounts;
+	' LANGUAGE SQL PARALLEL SAFE COST 100000.;
+});
+
+
+$updScr = File::Temp->new();
+append_to_file($updScr, q{
+	UPDATE pgbench_accounts SET abalance = abalance + 1 WHERE aid = 1;
+});
+
+$selScr = '
+	SELECT count(*) AS res FROM (
+		SELECT cnt() AS y FROM pgbench_accounts WHERE aid < 20
+		GROUP BY (y)
+	) AS q;
+';
+
+# Launch updates
+$pgb_handle1 = $node->pgbench_async(-n, -T => 10, -f => $updScr, 'postgres' );
+
+$errors = 0;
+$started = time();
+while (time() - $started < 10)
+{
+	# Check that each worker returns the same sum on balance column.
+	$result = $node->safe_psql('postgres', $selScr);
+	if ($result ne 1)
+	{
+		$errors++;
+		diag("Workers returned different sums: $result");
+	}
+}
+is($errors, 0, 'isolation between UPDATE and concurrent SELECT workers.');
+
+$node->pgbench_await($pgb_handle1);
+$node->stop();
\ No newline at end of file
diff --git a/src/test/modules/snapshot_too_old/sto.conf b/src/test/modules/snapshot_too_old/sto.conf
index 7eeaeeb0dc..3177cc0e15 100644
--- a/src/test/modules/snapshot_too_old/sto.conf
+++ b/src/test/modules/snapshot_too_old/sto.conf
@@ -1,2 +1,3 @@
 autovacuum = off
 old_snapshot_threshold = 0
+enable_csn_snapshot = false
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 9467a199c8..9d38a4922d 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2068,6 +2068,34 @@ sub pgbench
 	$self->command_checks_all(\@cmd, $stat, $out, $err, $name);
 }
 
+sub pgbench_async()
+{
+	my ($self, @args) = @_;
+
+	my ($in, $out, $err, $rc);
+	$in = '';
+	$out = '';
+
+	my @pgbench_command = (
+		'pgbench',
+		-h => $self->host,
+		-p => $self->port,
+		@args
+	);
+	my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
+	return $handle;
+}
+
+sub pgbench_await()
+{
+	my ($self, $pgbench_handle) = @_;
+
+	# During run some pgbench threads can exit (for example due to
+	# serialization error). That will set non-zero returning code.
+	# So don't check return code here and leave it to a caller.
+	my $rc = IPC::Run::finish($pgbench_handle);
+}
+
 =pod
 
 =item $node->connect_ok($connstr, $test_name, %params)
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 2088857615..010b3b3144 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -104,6 +104,8 @@ select name, setting from pg_settings where name like 'enable%';
 --------------------------------+---------
  enable_async_append            | on
  enable_bitmapscan              | on
+ enable_csn_snapshot            | on
+ enable_csn_wal                 | on
  enable_gathermerge             | on
  enable_hashagg                 | on
  enable_hashjoin                | on
@@ -122,7 +124,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(20 rows)
+(22 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
-- 
2.25.1

Reply via email to