On 2015-02-16 01:21:55 +0100, Andres Freund wrote:
> Here's my next attept attempt at producing something we can agree
> upon.
> 
> The major change that might achieve that is that I've now provided a
> separate method to store the origin_id of a node. I've made it
> conditional on !REPLICATION_IDENTIFIER_REUSE_PADDING, to show both
> paths. That new method uses Heikki's xlog rework to dynamically add the
> origin to the record if a origin is set up. That works surprisingly
> simply.
>
> Other changes:
> 
> * Locking preventing several backends to replay changes at the same
>   time. This is actually overly restrictive in some cases, but I think
>   good enough for now.
> * Logical decoding grew a filter_by_origin callback that allows to
>   ignore changes that were replayed on a remote system. Such filters are
>   executed before much is done with records, potentially saving a fair
>   bit of costs.
> * Rebased. That took a bit due the xlog and other changes.
> * A couple more SQL interface functions (like dropping a replication
>   identifier).

And here an actual patch.

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 268d52cac6bf7fe1c019fd68248853c7c7ae18b1 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 16 Feb 2015 01:22:08 +0100
Subject: [PATCH] Introduce replication identifiers to keep track of
 replication progress: v0.6

---
 contrib/test_decoding/Makefile                     |    3 +-
 contrib/test_decoding/expected/replident.out       |   84 ++
 contrib/test_decoding/sql/replident.sql            |   40 +
 contrib/test_decoding/test_decoding.c              |   28 +
 src/backend/access/rmgrdesc/xactdesc.c             |   17 +-
 src/backend/access/transam/xact.c                  |   64 +-
 src/backend/access/transam/xlog.c                  |   34 +-
 src/backend/access/transam/xloginsert.c            |   22 +-
 src/backend/access/transam/xlogreader.c            |   10 +
 src/backend/bootstrap/bootstrap.c                  |    5 +-
 src/backend/catalog/Makefile                       |    2 +-
 src/backend/catalog/catalog.c                      |    8 +-
 src/backend/catalog/system_views.sql               |    7 +
 src/backend/replication/logical/Makefile           |    3 +-
 src/backend/replication/logical/decode.c           |   63 +-
 src/backend/replication/logical/logical.c          |    5 +
 src/backend/replication/logical/reorderbuffer.c    |    5 +-
 .../replication/logical/replication_identifier.c   | 1190 ++++++++++++++++++++
 src/backend/storage/ipc/ipci.c                     |    3 +
 src/backend/utils/cache/syscache.c                 |   23 +
 src/backend/utils/misc/guc.c                       |    1 +
 src/bin/initdb/initdb.c                            |    1 +
 src/bin/pg_resetxlog/pg_resetxlog.c                |    3 +
 src/include/access/xact.h                          |   10 +-
 src/include/access/xlog.h                          |    1 +
 src/include/access/xlogdefs.h                      |    6 +
 src/include/access/xlogreader.h                    |    9 +
 src/include/access/xlogrecord.h                    |    5 +-
 src/include/catalog/indexing.h                     |    6 +
 src/include/catalog/pg_proc.h                      |   28 +
 src/include/catalog/pg_replication_identifier.h    |   75 ++
 src/include/pg_config_manual.h                     |    6 +
 src/include/replication/output_plugin.h            |    8 +
 src/include/replication/reorderbuffer.h            |    8 +-
 src/include/replication/replication_identifier.h   |   58 +
 src/include/utils/syscache.h                       |    2 +
 src/test/regress/expected/rules.out                |    5 +
 src/test/regress/expected/sanity_check.out         |    1 +
 38 files changed, 1816 insertions(+), 33 deletions(-)
 create mode 100644 contrib/test_decoding/expected/replident.out
 create mode 100644 contrib/test_decoding/sql/replident.sql
 create mode 100644 src/backend/replication/logical/replication_identifier.c
 create mode 100644 src/include/catalog/pg_replication_identifier.h
 create mode 100644 src/include/replication/replication_identifier.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 438be44..f8334cc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,7 +37,8 @@ submake-isolation:
 submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
+	binary prepared replident
 
 regresscheck: all | submake-regress submake-test_decoding
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/replident.out b/contrib/test_decoding/expected/replident.out
new file mode 100644
index 0000000..1c508a5
--- /dev/null
+++ b/contrib/test_decoding/expected/replident.out
@@ -0,0 +1,84 @@
+-- predictability
+SET synchronous_commit = on;
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+SELECT pg_replication_identifier_create('test_decoding: regression_slot');
+ pg_replication_identifier_create 
+----------------------------------
+                                1
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                                                                    data                                                                                    
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
+ table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
+ table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
+ COMMIT
+(5 rows)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+-- mark session as replaying
+SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot');
+ pg_replication_identifier_setup_replaying_from 
+------------------------------------------------
+ 
+(1 row)
+
+BEGIN;
+-- setup transaction origins
+SELECT pg_replication_identifier_setup_tx_origin('0/ffffffff', '2013-01-01 00:00');
+ pg_replication_identifier_setup_tx_origin 
+-------------------------------------------
+ 
+(1 row)
+
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+SELECT pg_replication_identifier_reset_replaying_from();
+ pg_replication_identifier_reset_replaying_from 
+------------------------------------------------
+ 
+(1 row)
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data 
+------
+(0 rows)
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
+                                      data                                      
+--------------------------------------------------------------------------------
+ BEGIN
+ table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
+ COMMIT
+(3 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_replication_identifier_drop('test_decoding: regression_slot');
+ pg_replication_identifier_drop 
+--------------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/replident.sql b/contrib/test_decoding/sql/replident.sql
new file mode 100644
index 0000000..f01836f
--- /dev/null
+++ b/contrib/test_decoding/sql/replident.sql
@@ -0,0 +1,40 @@
+-- predictability
+SET synchronous_commit = on;
+
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+
+SELECT pg_replication_identifier_create('test_decoding: regression_slot');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+
+-- mark session as replaying
+SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot');
+
+BEGIN;
+-- setup transaction origins
+SELECT pg_replication_identifier_setup_tx_origin('0/ffffffff', '2013-01-01 00:00');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+
+SELECT pg_replication_identifier_reset_replaying_from();
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_identifier_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 963d5df..2ec3001 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/replication_identifier.h"
 
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
+	bool		only_local;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferTXN *txn, Relation rel,
 				 ReorderBufferChange *change);
+static bool pg_decode_filter(LogicalDecodingContext *ctx,
+							 RepNodeId origin_id);
 
 void
 _PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pg_decode_begin_txn;
 	cb->change_cb = pg_decode_change;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 }
 
@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_xids = true;
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
+	data->only_local = false;
 
 	ctx->output_plugin_private = data;
 
@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "only-local") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->only_local = true;
+			else if (!parse_bool(strVal(elem->arg), &data->only_local))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static bool
+pg_decode_filter(LogicalDecodingContext *ctx,
+				 RepNodeId origin_id)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepNodeId)
+		return true;
+	return false;
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3e87978..0ec6b0f 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -25,9 +25,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 {
 	int			i;
 	TransactionId *subxacts;
+	SharedInvalidationMessage *msgs;
 
 	subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
 
+	msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
+
 	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
 	if (xlrec->nrels > 0)
@@ -49,9 +52,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 	}
 	if (xlrec->nmsgs > 0)
 	{
-		SharedInvalidationMessage *msgs;
-
-		msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
 
 		if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
 			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
@@ -80,6 +80,17 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 				appendStringInfo(buf, " unknown id %d", msg->id);
 		}
 	}
+	if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]);
+
+		appendStringInfo(buf, " origin %u, lsn %X/%X, at %s",
+						 origin->origin_node_id,
+						 (uint32)(origin->origin_lsn >> 32),
+						 (uint32)origin->origin_lsn,
+						 timestamptz_to_str(origin->origin_timestamp));
+	}
+
 }
 
 static void
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 97000ef..579f9cc 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,8 +40,10 @@
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/logical.h"
 #include "replication/walsender.h"
 #include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -1080,9 +1082,10 @@ RecordTransactionCommit(void)
 		 * gracefully. Till then, it's just 20 bytes of overhead.
 		 */
 		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
-			XLogLogicalInfoActive())
+			XLogLogicalInfoActive() || replication_origin_id != InvalidRepNodeId)
 		{
 			xl_xact_commit xlrec;
+			xl_xact_origin origin;
 
 			/*
 			 * Set flags required for recovery processing of commits.
@@ -1115,6 +1118,19 @@ RecordTransactionCommit(void)
 			if (nmsgs > 0)
 				XLogRegisterData((char *) invalMessages,
 								 nmsgs * sizeof(SharedInvalidationMessage));
+			/* dump transaction origin information */
+			if (replication_origin_id != InvalidRepNodeId)
+			{
+				xlrec.xinfo |= XACT_CONTAINS_ORIGIN;
+
+				origin.origin_node_id = replication_origin_id;
+				origin.origin_lsn = replication_origin_lsn;
+				origin.origin_timestamp = replication_origin_timestamp;
+
+				XLogRegisterData((char *) &origin,
+								 sizeof(xl_xact_origin));
+
+			}
 			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
 		}
 		else
@@ -1135,6 +1151,13 @@ RecordTransactionCommit(void)
 		}
 	}
 
+	/* record plain commit ts if not replaying remote actions */
+	if (replication_origin_id == InvalidRepNodeId ||
+		replication_origin_id == DoNotReplicateRepNodeId)
+		replication_origin_timestamp = xactStopTimestamp;
+	else
+		AdvanceCachedReplicationIdentifier(replication_origin_lsn, XactLastRecEnd);
+
 	/*
 	 * We only need to log the commit timestamp separately if the node
 	 * identifier is a valid value; the commit record above already contains
@@ -1146,7 +1169,7 @@ RecordTransactionCommit(void)
 
 		node_id = CommitTsGetDefaultNodeId();
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
-									   xactStopTimestamp,
+									   replication_origin_timestamp,
 									   node_id, node_id != InvalidCommitTsNodeId);
 	}
 
@@ -1230,9 +1253,11 @@ RecordTransactionCommit(void)
 	if (wrote_xlog)
 		SyncRepWaitForLSN(XactLastRecEnd);
 
+	/* remember end of last commit record */
+	XactLastCommitEnd = XactLastRecEnd;
+
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd = 0;
-
 cleanup:
 	/* Clean up local data */
 	if (rels)
@@ -4665,10 +4690,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 						  SharedInvalidationMessage *inval_msgs, int nmsgs,
 						  RelFileNode *xnodes, int nrels,
 						  Oid dbId, Oid tsId,
-						  uint32 xinfo)
+						  uint32 xinfo,
+						  xl_xact_origin *origin)
 {
 	TransactionId max_xid;
 	int			i;
+	RepNodeId	origin_node_id = InvalidRepNodeId;
 
 	max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
 
@@ -4688,9 +4715,18 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		LWLockRelease(XidGenLock);
 	}
 
+	Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL));
+
+	if (xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		origin_node_id = origin->origin_node_id;
+		commit_time = origin->origin_timestamp;
+	}
+
 	/* Set the transaction commit timestamp and metadata */
 	TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
-								   commit_time, InvalidCommitTsNodeId, false);
+								   commit_time, origin_node_id, false);
+
 
 	if (standbyState == STANDBY_DISABLED)
 	{
@@ -4747,6 +4783,14 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		StandbyReleaseLockTree(xid, 0, NULL);
 	}
 
+	if (xinfo & XACT_CONTAINS_ORIGIN)
+	{
+		/* recover apply progress */
+		AdvanceReplicationIdentifier(origin_node_id,
+									 origin->origin_lsn,
+									 lsn);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (nrels > 0)
 	{
@@ -4805,19 +4849,24 @@ xact_redo_commit(xl_xact_commit *xlrec,
 {
 	TransactionId *subxacts;
 	SharedInvalidationMessage *inval_msgs;
+	xl_xact_origin *origin = NULL;
 
 	/* subxid array follows relfilenodes */
 	subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
 	/* invalidation messages array follows subxids */
 	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
+	if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+		origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]);
+
 	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
 							  subxacts, xlrec->nsubxacts,
 							  inval_msgs, xlrec->nmsgs,
 							  xlrec->xnodes, xlrec->nrels,
 							  xlrec->dbId,
 							  xlrec->tsId,
-							  xlrec->xinfo);
+							  xlrec->xinfo,
+							  origin);
 }
 
 /*
@@ -4833,7 +4882,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
 							  NULL, 0,	/* relfilenodes */
 							  InvalidOid,		/* dbId */
 							  InvalidOid,		/* tsId */
-							  0);		/* xinfo */
+							  0,		/* xinfo */
+							  NULL		/* origin */);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..5eb0ef5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -44,6 +44,7 @@
 #include "postmaster/startup.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -297,6 +298,8 @@ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
 
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
 
+XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
+
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
  * (which is almost but not quite the same as a pointer to the most recent
@@ -6014,6 +6017,11 @@ StartupXLOG(void)
 	StartupMultiXact();
 
 	/*
+	 * Recover knowledge about replay progress of known replication partners.
+	 */
+	StartupReplicationIdentifier(checkPoint.redo);
+
+	/*
 	 * Initialize unlogged LSN. On a clean shutdown, it's restored from the
 	 * control file. On recovery, all unlogged relations are blown away, so
 	 * the unlogged LSN counter can be reset too.
@@ -7645,6 +7653,7 @@ CreateCheckPoint(int flags)
 	XLogRecPtr	recptr;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint32		freespace;
+	XLogRecPtr	oldRedoPtr;
 	XLogSegNo	_logSegNo;
 	XLogRecPtr	curInsert;
 	VirtualTransactionId *vxids;
@@ -7960,10 +7969,10 @@ CreateCheckPoint(int flags)
 				(errmsg("concurrent transaction log activity while database system is shutting down")));
 
 	/*
-	 * Select point at which we can truncate the log, which we base on the
-	 * prior checkpoint's earliest info.
+	 * Select point at which we can truncate the log (and other resources
+	 * related to it), which we base on the prior checkpoint's earliest info.
 	 */
-	XLByteToSeg(ControlFile->checkPointCopy.redo, _logSegNo);
+	oldRedoPtr = ControlFile->checkPointCopy.redo;
 
 	/*
 	 * Update the control file.
@@ -8018,6 +8027,7 @@ CreateCheckPoint(int flags)
 	 * Delete old log files (those no longer needed even for previous
 	 * checkpoint or the standbys in XLOG streaming).
 	 */
+	XLByteToSeg(oldRedoPtr, _logSegNo);
 	if (_logSegNo)
 	{
 		KeepLogSeg(recptr, &_logSegNo);
@@ -8047,6 +8057,13 @@ CreateCheckPoint(int flags)
 	 */
 	TruncateMultiXact();
 
+	/*
+	 * Remove old replication identifier checkpoints. We're using the previous
+	 * checkpoint's redo ptr as a cutoff - even if we were to use that
+	 * checkpoint to startup we're not going to need anything older.
+	 */
+	TruncateReplicationIdentifier(oldRedoPtr);
+
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
 
@@ -8130,6 +8147,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointBuffers(flags);	/* performs all required fsyncs */
+	CheckPointReplicationIdentifier(checkPointRedo);
 	/* We deliberately delay 2PC checkpointing as long as possible */
 	CheckPointTwoPhase(checkPointRedo);
 }
@@ -8190,6 +8208,7 @@ CreateRestartPoint(int flags)
 {
 	XLogRecPtr	lastCheckPointRecPtr;
 	CheckPoint	lastCheckPoint;
+	XLogRecPtr	oldRedoPtr;
 	XLogSegNo	_logSegNo;
 	TimestampTz xtime;
 
@@ -8289,7 +8308,7 @@ CreateRestartPoint(int flags)
 	 * Select point at which we can truncate the xlog, which we base on the
 	 * prior checkpoint's earliest info.
 	 */
-	XLByteToSeg(ControlFile->checkPointCopy.redo, _logSegNo);
+	oldRedoPtr = ControlFile->checkPointCopy.redo;
 
 	/*
 	 * Update pg_control, using current time.  Check that it still shows
@@ -8316,6 +8335,7 @@ CreateRestartPoint(int flags)
 	 * checkpoint/restartpoint) to prevent the disk holding the xlog from
 	 * growing full.
 	 */
+	XLByteToSeg(oldRedoPtr, _logSegNo);
 	if (_logSegNo)
 	{
 		XLogRecPtr	receivePtr;
@@ -8385,6 +8405,12 @@ CreateRestartPoint(int flags)
 	TruncateMultiXact();
 
 	/*
+	 * Also truncate replication identifiers. c.f. CreateCheckPoint()'s
+	 * comment.
+	 */
+	TruncateReplicationIdentifier(oldRedoPtr);
+
+	/*
 	 * Truncate pg_subtrans if possible.  We can throw away all data before
 	 * the oldest XMIN of any running transaction.  No future transaction will
 	 * attempt to reference any pg_subtrans entry older than that (see Asserts
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a1e2eb8..a91298b 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -25,6 +25,7 @@
 #include "access/xloginsert.h"
 #include "catalog/pg_control.h"
 #include "miscadmin.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
@@ -76,10 +77,16 @@ static uint32 mainrdata_len;	/* total # of bytes in chain */
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING
+#define SizeOfXlogOrigin	0
+#else
+#define SizeOfXlogOrigin	(sizeof(RepNodeId) + sizeof(XLR_BLOCK_ID_ORIGIN))
+#endif
+
 #define HEADER_SCRATCH_SIZE \
 	(SizeOfXLogRecord + \
 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-	 SizeOfXLogRecordDataHeaderLong)
+	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -629,6 +636,16 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		scratch += sizeof(BlockNumber);
 	}
 
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+	/* followed by the record's origin, if any */
+	if (replication_origin_id != InvalidRepNodeId)
+	{
+		*(scratch++) = XLR_BLOCK_ID_ORIGIN;
+		memcpy(scratch, &replication_origin_id, sizeof(replication_origin_id));
+		scratch += sizeof(replication_origin_id);
+	}
+#endif
+
 	/* followed by main data, if any */
 	if (mainrdata_len > 0)
 	{
@@ -674,6 +691,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_tot_len = total_len;
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
+#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING
+	rechdr->xl_origin_id = replication_origin_id;
+#endif
 	rechdr->xl_prev = InvalidXLogRecPtr;
 	rechdr->xl_crc = rdata_crc;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 60470b5..f8233a0 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -20,6 +20,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
+#include "replication/replication_identifier.h"
 
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 
@@ -956,6 +957,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 	ResetDecoder(state);
 
 	state->decoded_record = record;
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+	state->record_origin = InvalidRepNodeId;
+#endif
 
 	ptr = (char *) record;
 	ptr += SizeOfXLogRecord;
@@ -990,6 +994,12 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			break;				/* by convention, the main data fragment is
 								 * always last */
 		}
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+		else if (block_id == XLR_BLOCK_ID_ORIGIN)
+		{
+			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepNodeId));
+		}
+#endif
 		else if (block_id <= XLR_MAX_BLOCK_ID)
 		{
 			/* XLogRecordBlockHeader */
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index bc66eac..e2de408 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -705,10 +705,13 @@ DefineAttr(char *name, char *type, int attnum)
 	 * oidvector and int2vector are also treated as not-nullable, even though
 	 * they are no longer fixed-width.
 	 */
+	/* FIXME!!!! */
 #define MARKNOTNULL(att) \
 	((att)->attlen > 0 || \
 	 (att)->atttypid == OIDVECTOROID || \
-	 (att)->atttypid == INT2VECTOROID)
+	 (att)->atttypid == INT2VECTOROID || \
+	 strcmp(NameStr((att)->attname), "riname") == 0 \
+		)
 
 	if (MARKNOTNULL(attrtypes[attnum]))
 	{
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index a403c64..5b04550 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
 	pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
 	pg_ts_parser.h pg_ts_template.h pg_extension.h \
 	pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
-	pg_foreign_table.h pg_policy.h \
+	pg_foreign_table.h pg_policy.h pg_replication_identifier.h \
 	pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
 	toasting.h indexing.h \
     )
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 8e7a9ec..318d65a 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_pltemplate.h"
 #include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
@@ -224,7 +225,8 @@ IsSharedRelation(Oid relationId)
 		relationId == SharedDependRelationId ||
 		relationId == SharedSecLabelRelationId ||
 		relationId == TableSpaceRelationId ||
-		relationId == DbRoleSettingRelationId)
+		relationId == DbRoleSettingRelationId ||
+		relationId == ReplicationIdentifierRelationId)
 		return true;
 	/* These are their indexes (see indexing.h) */
 	if (relationId == AuthIdRolnameIndexId ||
@@ -240,7 +242,9 @@ IsSharedRelation(Oid relationId)
 		relationId == SharedSecLabelObjectIndexId ||
 		relationId == TablespaceOidIndexId ||
 		relationId == TablespaceNameIndexId ||
-		relationId == DbRoleSettingDatidRolidIndexId)
+		relationId == DbRoleSettingDatidRolidIndexId ||
+		relationId ==  ReplicationLocalIdentIndex ||
+		relationId ==  ReplicationExternalIdentIndex)
 		return true;
 	/* These are their toast tables and toast indexes (see toasting.h) */
 	if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5e69e2b..4559f99 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -769,6 +769,13 @@ CREATE VIEW pg_user_mappings AS
 
 REVOKE ALL on pg_user_mapping FROM public;
 
+
+CREATE VIEW pg_replication_identifier_progress AS
+    SELECT *
+    FROM pg_get_replication_identifier_progress();
+
+REVOKE ALL ON pg_replication_identifier_progress FROM public;
+
 --
 -- We have a few function definitions in here, too.
 -- At some point there might be enough to justify breaking them out into
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 310a45c..95bcffb 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+	snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77c02ba..f8f7016 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -40,6 +40,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 
 #include "storage/standby.h"
@@ -67,7 +68,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 TransactionId xid, Oid dboid,
 			 TimestampTz commit_time,
 			 int nsubxacts, TransactionId *sub_xids,
-			 int ninval_msgs, SharedInvalidationMessage *msg);
+			 int ninval_msgs, SharedInvalidationMessage *msg,
+			 xl_xact_origin *origin);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
 			TransactionId xid, TransactionId *sub_xids, int nsubxacts);
 
@@ -201,16 +203,20 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				xl_xact_commit *xlrec;
 				TransactionId *subxacts = NULL;
 				SharedInvalidationMessage *invals = NULL;
+				xl_xact_origin *origin = NULL;
 
 				xlrec = (xl_xact_commit *) XLogRecGetData(r);
 
 				subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
 				invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
+				if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+					origin = (xl_xact_origin *) &(invals[xlrec->nmsgs]);
+
 				DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
 							 xlrec->xact_time,
 							 xlrec->nsubxacts, subxacts,
-							 xlrec->nmsgs, invals);
+							 xlrec->nmsgs, invals, origin);
 
 				break;
 			}
@@ -220,6 +226,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				xl_xact_commit *xlrec;
 				TransactionId *subxacts;
 				SharedInvalidationMessage *invals = NULL;
+				xl_xact_origin *origin = NULL;
 
 				/* Prepared commits contain a normal commit record... */
 				prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
@@ -228,10 +235,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
 				invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
+				if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+					origin = (xl_xact_origin *) &(invals[xlrec->nmsgs]);
+
 				DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
 							 xlrec->xact_time,
 							 xlrec->nsubxacts, subxacts,
-							 xlrec->nmsgs, invals);
+							 xlrec->nmsgs, invals, origin);
 
 				break;
 			}
@@ -244,7 +254,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
 							 xlrec->xact_time,
 							 xlrec->nsubxacts, xlrec->subxacts,
-							 0, NULL);
+							 0, NULL, NULL);
 				break;
 			}
 		case XLOG_XACT_ABORT:
@@ -480,10 +490,19 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 TransactionId xid, Oid dboid,
 			 TimestampTz commit_time,
 			 int nsubxacts, TransactionId *sub_xids,
-			 int ninval_msgs, SharedInvalidationMessage *msgs)
+			 int ninval_msgs, SharedInvalidationMessage *msgs,
+			 xl_xact_origin *origin)
 {
+	RepNodeId	origin_id = InvalidRepNodeId;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
 	int			i;
 
+	if (origin != NULL)
+	{
+		origin_id = origin->origin_node_id;
+		origin_lsn = origin->origin_lsn;
+	}
+
 	/*
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
@@ -504,12 +523,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * the reorderbuffer to forget the content of the (sub-)transactions
 	 * if not.
 	 *
-	 * There basically two reasons we might not be interested in this
+	 * There can be several reasons we might not be interested in this
 	 * transaction:
 	 * 1) We might not be interested in decoding transactions up to this
 	 *	  LSN. This can happen because we previously decoded it and now just
 	 *	  are restarting or if we haven't assembled a consistent snapshot yet.
 	 * 2) The transaction happened in another database.
+	 * 3) The output plugin is not interested in the origin.
 	 *
 	 * We can't just use ReorderBufferAbort() here, because we need to execute
 	 * the transaction's invalidations.  This currently won't be needed if
@@ -524,7 +544,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * ---
 	 */
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-		(dboid != InvalidOid && dboid != ctx->slot->data.database))
+		(dboid != InvalidOid && dboid != ctx->slot->data.database) ||
+		(ctx->callbacks.filter_by_origin_cb &&
+		 ctx->callbacks.filter_by_origin_cb(ctx, origin_id)))
 	{
 		for (i = 0; i < nsubxacts; i++)
 		{
@@ -546,7 +568,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time);
+						commit_time, origin_id, origin_lsn);
 }
 
 /*
@@ -590,8 +612,14 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (ctx->callbacks.filter_by_origin_cb &&
+		ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
 	if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -632,8 +660,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (ctx->callbacks.filter_by_origin_cb &&
+		ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = XLogRecGetOrigin(r);
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
 	if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -681,8 +715,14 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_node.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (ctx->callbacks.filter_by_origin_cb &&
+		ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
 
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
@@ -726,6 +766,11 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (rnode.dbNode != ctx->slot->data.database)
 		return;
 
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (ctx->callbacks.filter_by_origin_cb &&
+		ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r)))
+		return;
+
 	tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
 
 	data = tupledata;
@@ -738,6 +783,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		change = ReorderBufferGetChange(ctx->reorder);
 		change->action = REORDER_BUFFER_CHANGE_INSERT;
+		change->origin_id = XLogRecGetOrigin(r);
+
 		memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
 
 		/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 30baa45..638a663 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -39,6 +39,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/replication_identifier.h"
 #include "replication/snapbuild.h"
 
 #include "storage/proc.h"
@@ -46,6 +47,10 @@
 
 #include "utils/memutils.h"
 
+RepNodeId	replication_origin_id = InvalidRepNodeId; /* assumed identity */
+XLogRecPtr	replication_origin_lsn;
+TimestampTz	replication_origin_timestamp;
+
 /* data for errcontext callback */
 typedef struct LogicalErrorCallbackState
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index bcd5896..30086c9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1255,7 +1255,8 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 void
 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time)
+					TimestampTz commit_time,
+					RepNodeId origin_id, XLogRecPtr origin_lsn)
 {
 	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
@@ -1273,6 +1274,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
 
 	/* serialize the last bunch of changes if we need start earlier anyway */
 	if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c
new file mode 100644
index 0000000..1364cea
--- /dev/null
+++ b/src/backend/replication/logical/replication_identifier.c
@@ -0,0 +1,1190 @@
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ *	  Logical Replication Node Identifier and replication progress persistency
+ *	  support.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/replication_identifier.c
+ *
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/copydir.h"
+
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/pg_lsn.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+	/*
+	 * Local identifier for the remote node.
+	 */
+	RepNodeId	local_identifier;
+
+	/*
+	 * Location of the latest commit from the remote side.
+	 */
+	XLogRecPtr	remote_lsn;
+
+	/*
+	 * Remember the local lsn of the commit record so we can XLogFlush() to it
+	 * during a checkpoint so we know the commit record actually is safe on
+	 * disk.
+	 */
+	XLogRecPtr	local_lsn;
+
+	/*
+	 * Slot is setup in backend?
+	 */
+	pid_t		acquired_by;
+} ReplicationState;
+
+/*
+ * On disk version of ReplicationState.
+ */
+typedef struct ReplicationStateOnDisk
+{
+	RepNodeId	local_identifier;
+	XLogRecPtr	remote_lsn;
+} ReplicationStateOnDisk;
+
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_replication_slots.
+ *
+ * XXX: Should we use a separate variable to size this rather than
+ * max_replication_slots?
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepNodeId.
+ */
+static ReplicationState *local_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE
+
+/* XXX: move to c.h? */
+#ifndef UINT16_MAX
+#define UINT16_MAX ((1<<16) - 1)
+#else
+#if UINT16_MAX != ((1<<16) - 1)
+#error "uh, wrong UINT16_MAX?"
+#endif
+#endif
+
+/*
+ * Check for a persistent repication identifier identified by the replication
+ * identifier's external name..
+ *
+ * Returns InvalidOid if the node isn't known yet.
+ */
+RepNodeId
+GetReplicationIdentifier(char *riname, bool missing_ok)
+{
+	Form_pg_replication_identifier ident;
+	Oid		riident = InvalidOid;
+	HeapTuple tuple;
+	Datum	riname_d;
+
+	riname_d = CStringGetTextDatum(riname);
+
+	tuple = SearchSysCache1(REPLIDREMOTE, riname_d);
+	if (HeapTupleIsValid(tuple))
+	{
+		ident = (Form_pg_replication_identifier) GETSTRUCT(tuple);
+		riident = ident->riident;
+		ReleaseSysCache(tuple);
+	}
+	else if (!missing_ok)
+		elog(ERROR, "cache lookup failed for replication identifier named %s",
+			riname);
+
+	return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+RepNodeId
+CreateReplicationIdentifier(char *riname)
+{
+	Oid		riident;
+	HeapTuple tuple = NULL;
+	Relation rel;
+	Datum	riname_d;
+	SnapshotData SnapshotDirty;
+	SysScanDesc scan;
+	ScanKeyData key;
+
+	riname_d = CStringGetTextDatum(riname);
+
+	Assert(IsTransactionState());
+
+	/*
+	 * We need the numeric replication identifiers to be 16bit wide, so we
+	 * cannot rely on the normal oid allocation. So we simply scan
+	 * pg_replication_identifier for the first unused id. That's not
+	 * particularly efficient, but this should be an fairly infrequent
+	 * operation - we can easily spend a bit more code on this when it turns
+	 * out it needs to be faster.
+	 *
+	 * We handle concurrency by taking an exclusive lock (allowing reads!)
+	 * over the table for the duration of the search. Because we use a "dirty
+	 * snapshot" we can read rows that other in-progress sessions have
+	 * written, even though they would be invisible with normal snapshots. Due
+	 * to the exclusive lock there's no danger that new rows can appear while
+	 * we're checking.
+	 */
+	InitDirtySnapshot(SnapshotDirty);
+
+	rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+	for (riident = InvalidOid + 1; riident < UINT16_MAX; riident++)
+	{
+		bool		nulls[Natts_pg_replication_identifier];
+		Datum		values[Natts_pg_replication_identifier];
+		bool		collides;
+		CHECK_FOR_INTERRUPTS();
+
+		ScanKeyInit(&key,
+					Anum_pg_replication_riident,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(riident));
+
+		scan = systable_beginscan(rel, ReplicationLocalIdentIndex,
+								  true /* indexOK */,
+								  &SnapshotDirty,
+								  1, &key);
+
+		collides = HeapTupleIsValid(systable_getnext(scan));
+
+		systable_endscan(scan);
+
+		if (!collides)
+		{
+			/*
+			 * Ok, found an unused riident, insert the new row and do a CCI,
+			 * so our callers can look it up if they want to.
+			 */
+			memset(&nulls, 0, sizeof(nulls));
+
+			values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+			values[Anum_pg_replication_riname - 1] = riname_d;
+
+			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+			simple_heap_insert(rel, tuple);
+			CatalogUpdateIndexes(rel, tuple);
+			CommandCounterIncrement();
+			break;
+		}
+	}
+
+	/* now release lock again,  */
+	heap_close(rel, ExclusiveLock);
+
+	if (tuple == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("no free replication id could be found")));
+
+	heap_freetuple(tuple);
+	return riident;
+}
+
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+void
+DropReplicationIdentifier(RepNodeId riident)
+{
+	HeapTuple tuple = NULL;
+	Relation rel;
+	SnapshotData SnapshotDirty;
+	SysScanDesc scan;
+	ScanKeyData key;
+	int			i;
+
+	Assert(IsTransactionState());
+
+	InitDirtySnapshot(SnapshotDirty);
+
+	rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+	/* cleanup the slot state info */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *state = &ReplicationStates[i];
+
+		/* found our slot */
+		if (state->local_identifier == riident)
+		{
+			if (state->acquired_by != 0)
+			{
+				elog(ERROR, "cannot drop slot that is setup in backend %d",
+					 state->acquired_by);
+			}
+			memset(state, 0, sizeof(ReplicationState));
+			break;
+		}
+	}
+
+	ScanKeyInit(&key,
+				Anum_pg_replication_riident,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(riident));
+
+	scan = systable_beginscan(rel, ReplicationLocalIdentIndex,
+							  true /* indexOK */,
+							  &SnapshotDirty,
+							  1, &key);
+
+	tuple = systable_getnext(scan);
+
+	if (HeapTupleIsValid(tuple))
+		simple_heap_delete(rel, &tuple->t_self);
+
+	systable_endscan(scan);
+
+	CommandCounterIncrement();
+
+	/* now release lock again,  */
+	heap_close(rel, ExclusiveLock);
+}
+
+
+/*
+ * Lookup pg_replication_identifier tuple via its riident.
+ *
+ * The result needs to be ReleaseSysCache'ed and is an invalid HeapTuple if
+ * the lookup failed.
+ */
+void
+GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, char **riname)
+{
+	HeapTuple tuple;
+	Form_pg_replication_identifier ric;
+
+	Assert(OidIsValid((Oid) riident));
+	Assert(riident != InvalidRepNodeId);
+	Assert(riident != DoNotReplicateRepNodeId);
+
+	tuple = SearchSysCache1(REPLIDIDENT,
+							ObjectIdGetDatum((Oid) riident));
+
+	if (HeapTupleIsValid(tuple))
+	{
+		ric = (Form_pg_replication_identifier) GETSTRUCT(tuple);
+		*riname = pstrdup(text_to_cstring(&ric->riname));
+	}
+
+	if (!HeapTupleIsValid(tuple) && !missing_ok)
+		elog(ERROR, "cache lookup failed for replication identifier id: %u",
+			 riident);
+
+	if (HeapTupleIsValid(tuple))
+		ReleaseSysCache(tuple);
+}
+
+static void
+CheckReplicationIdentifierPrerequisites(bool check_slots)
+{
+	if (!superuser())
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("only superusers can query or manipulate replication identifiers")));
+
+	if (check_slots && max_replication_slots == 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot query or manipulate replication identifiers when max_replication_slots = 0")));
+
+}
+
+Datum
+pg_replication_identifier_get(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId riident;
+
+	CheckReplicationIdentifierPrerequisites(false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	riident = GetReplicationIdentifier(name, true);
+
+	pfree(name);
+
+	if (OidIsValid(riident))
+		PG_RETURN_OID(riident);
+	PG_RETURN_NULL();
+}
+
+
+Datum
+pg_replication_identifier_create(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId riident;
+
+	CheckReplicationIdentifierPrerequisites(false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	riident = CreateReplicationIdentifier(name);
+
+	pfree(name);
+
+	PG_RETURN_OID(riident);
+}
+
+Datum
+pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId origin;
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+	origin = GetReplicationIdentifier(name, false);
+	SetupCachedReplicationIdentifier(origin);
+
+	replication_origin_id = origin;
+
+	pfree(name);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_is_replaying(PG_FUNCTION_ARGS)
+{
+	CheckReplicationIdentifierPrerequisites(true);
+
+	PG_RETURN_BOOL(replication_origin_id != InvalidRepNodeId);
+}
+
+Datum
+pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS)
+{
+	CheckReplicationIdentifierPrerequisites(true);
+
+	TeardownCachedReplicationIdentifier();
+
+	replication_origin_id = InvalidRepNodeId;
+
+	PG_RETURN_VOID();
+}
+
+
+Datum
+pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	location = PG_GETARG_LSN(0);
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	if (local_replication_state == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("need to setup the origin id first")));
+
+	replication_origin_lsn = location;
+	replication_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_get_replication_identifier_progress(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	int			i;
+#define REPLICATION_IDENTIFIER_PROGRESS_COLS 4
+
+	/* we we want to return 0 rows if slot is set to zero */
+	CheckReplicationIdentifierPrerequisites(false);
+
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	if (tupdesc->natts != REPLICATION_IDENTIFIER_PROGRESS_COLS)
+		elog(ERROR, "wrong function definition");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* prevent slots from being concurrently dropped */
+	LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+	/*
+	 * Iterate through all possible ReplicationStates, display if they are
+	 * filled. Note that we do not take any locks, so slightly corrupted/out
+	 * of date values are a possibility.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *state;
+		Datum		values[REPLICATION_IDENTIFIER_PROGRESS_COLS];
+		bool		nulls[REPLICATION_IDENTIFIER_PROGRESS_COLS];
+		char	   *riname;
+
+		state = &ReplicationStates[i];
+
+		/* unused slot, nothing to display */
+		if (state->local_identifier == InvalidRepNodeId)
+			continue;
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, 0, sizeof(nulls));
+
+		values[ 0] = ObjectIdGetDatum(state->local_identifier);
+
+		GetReplicationInfoByIdentifier(state->local_identifier, true, &riname);
+
+		/*
+		 * We're not preventing the identifier to be dropped concurrently, so
+		 * silently accept that it might be gone.
+		 */
+		if (!riname)
+			continue;
+
+		values[ 1] = CStringGetTextDatum(riname);
+
+		values[ 2] = LSNGetDatum(state->remote_lsn);
+
+		values[ 3] = LSNGetDatum(state->local_lsn);
+
+		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	}
+
+	tuplestore_donestoring(tupstore);
+
+	UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+#undef REPLICATION_IDENTIFIER_PROGRESS_COLS
+
+	return (Datum) 0;
+}
+
+Datum
+pg_replication_identifier_advance(PG_FUNCTION_ARGS)
+{
+	text	   *name = PG_GETARG_TEXT_P(0);
+	XLogRecPtr remote_commit = PG_GETARG_LSN(1);
+	XLogRecPtr local_commit = PG_GETARG_LSN(2);
+	RepNodeId  node;
+
+	CheckReplicationIdentifierPrerequisites(true);
+
+	/* lock to prevent the replication identifier from vanishing */
+	LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+	node = GetReplicationIdentifier(text_to_cstring(name), false);
+
+	AdvanceReplicationIdentifier(node, remote_commit, local_commit);
+
+	UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_drop(PG_FUNCTION_ARGS)
+{
+	char *name;
+	RepNodeId riident;
+
+	CheckReplicationIdentifierPrerequisites(false);
+
+	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+
+	riident = GetReplicationIdentifier(name, false);
+	Assert(OidIsValid(riident));
+
+	DropReplicationIdentifier(riident);
+
+	pfree(name);
+
+	PG_RETURN_VOID();
+}
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+	Size		size = 0;
+
+	/*
+	 * FIXME: max_replication_slots is the wrong thing to use here, here we keep
+	 * the replay state of *remote* transactions.
+	 */
+	if (max_replication_slots == 0)
+		return size;
+
+	size = add_size(size,
+					mul_size(max_replication_slots, sizeof(ReplicationState)));
+	return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+	bool		found;
+
+	if (max_replication_slots == 0)
+		return;
+
+	ReplicationStates = (ReplicationState *)
+		ShmemInitStruct("ReplicationIdentifierState",
+						ReplicationIdentifierShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+	}
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+------------------------+------------------+-----+--------+
+ * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
+ * +-------+------------------------+------------------+-----+--------+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStateOnDisk structs. Note that the maximum number of
+ * ReplicationStates is determined by max_replication_slots.
+ *
+ * FIXME: Add a CRC32 to the end.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(XLogRecPtr ckpt)
+{
+	char tmppath[MAXPGPATH];
+	char path[MAXPGPATH];
+	int fd;
+	int tmpfd;
+	int i;
+	uint32 magic = REPLICATION_STATE_MAGIC;
+	pg_crc32 crc;
+
+	if (max_replication_slots == 0)
+		return;
+
+	INIT_CRC32C(crc);
+
+	/*
+	 * Write to a filename a LSN of the checkpoint's REDO pointer, so we can
+	 * deal with the checkpoint failing after
+	 * CheckPointReplicationIdentifier() finishing.
+	 */
+	sprintf(path, "pg_logical/checkpoints/%X-%X.ckpt",
+			(uint32)(ckpt >> 32), (uint32)ckpt);
+	sprintf(tmppath, "pg_logical/checkpoints/%X-%X.ckpt.tmp",
+			(uint32)(ckpt >> 32), (uint32)ckpt);
+
+	/* check whether file already exists */
+	fd = OpenTransientFile(path,
+						   O_RDONLY | PG_BINARY,
+						   0);
+
+	/* usual case, no checkpoint performed yet */
+	if (fd < 0 && errno == ENOENT)
+		;
+	else if (fd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not check replication state checkpoint \"%s\": %m",
+						path)));
+	/* already checkpointed before crash during a checkpoint or so */
+	else
+	{
+		CloseTransientFile(fd);
+		return;
+	}
+
+	/* make sure no old temp file is remaining */
+	if (unlink(tmppath) < 0 && errno != ENOENT)
+		ereport(PANIC, (errmsg("failed while unlinking %s", path)));
+
+	/*
+	 * no other backend can perform this at the same time, we're protected by
+	 * CheckpointLock.
+	 */
+	tmpfd = OpenTransientFile(tmppath,
+							  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+							  S_IRUSR | S_IWUSR);
+	if (tmpfd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not create replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+
+	/* write magic */
+	if ((write(tmpfd, &magic, sizeof(magic))) !=
+		sizeof(magic))
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not write replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+	}
+	COMP_CRC32C(crc, &magic, sizeof(magic));
+
+	/* write actual data */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationStateOnDisk disk_state;
+
+		/* XXX: Locking */
+
+		if (ReplicationStates[i].local_identifier == InvalidRepNodeId)
+			continue;
+
+		disk_state.local_identifier = ReplicationStates[i].local_identifier;
+		disk_state.remote_lsn = ReplicationStates[i].remote_lsn;
+
+		/* make sure we only write out a commit that's persistent */
+		XLogFlush(ReplicationStates[i].local_lsn);
+
+		if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
+			sizeof(disk_state))
+		{
+			CloseTransientFile(tmpfd);
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not write replication identifier checkpoint \"%s\": %m",
+							tmppath)));
+		}
+
+		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+	}
+
+	/* write out the CRC */
+	FIN_CRC32C(crc);
+	if ((write(tmpfd, &crc, sizeof(crc))) !=
+		sizeof(crc))
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not write replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+	}
+
+	/* fsync the file */
+	if (pg_fsync(tmpfd) != 0)
+	{
+		CloseTransientFile(tmpfd);
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not fsync replication identifier checkpoint \"%s\": %m",
+						tmppath)));
+	}
+
+	CloseTransientFile(tmpfd);
+
+	/* rename to permanent file, fsync file and directory */
+	if (rename(tmppath, path) != 0)
+	{
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m",
+						tmppath, path)));
+	}
+
+	fsync_fname("pg_logical/checkpoints", true);
+	fsync_fname(path, false);
+}
+
+/*
+ * Remove old replication identifier checkpoints that cannot possibly be
+ * needed anymore for crash recovery.
+ */
+void
+TruncateReplicationIdentifier(XLogRecPtr cutoff)
+{
+	DIR		   *snap_dir;
+	struct dirent *snap_de;
+	char		path[MAXPGPATH];
+
+	snap_dir = AllocateDir("pg_logical/checkpoints");
+	while ((snap_de = ReadDir(snap_dir, "pg_logical/checkpoints")) != NULL)
+	{
+		uint32		hi;
+		uint32		lo;
+		XLogRecPtr	lsn;
+		struct stat statbuf;
+
+		if (strcmp(snap_de->d_name, ".") == 0 ||
+			strcmp(snap_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, MAXPGPATH, "pg_logical/checkpoints/%s", snap_de->d_name);
+
+		if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
+		{
+			elog(DEBUG1, "only regular files expected: %s", path);
+			continue;
+		}
+
+		if (sscanf(snap_de->d_name, "%X-%X.ckpt", &hi, &lo) != 2)
+		{
+			ereport(LOG,
+					(errmsg("could not parse filename \"%s\"", path)));
+			continue;
+		}
+
+		lsn = ((uint64) hi) << 32 | lo;
+
+		/* check whether we still need it */
+		if (lsn < cutoff)
+		{
+			elog(DEBUG2, "removing replication identifier checkpoint %s", path);
+
+			/*
+			 * It's not particularly harmful, though strange, if we can't
+			 * remove the file here. Don't prevent the checkpoint from
+			 * completing, that'd be cure worse than the disease.
+			 */
+			if (unlink(path) < 0)
+			{
+				ereport(LOG,
+						(errcode_for_file_access(),
+						 errmsg("could not unlink file \"%s\": %m",
+								path)));
+				continue;
+			}
+		}
+		else
+		{
+			elog(DEBUG2, "keeping replication identifier checkpoint %s", path);
+		}
+	}
+	FreeDir(snap_dir);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(XLogRecPtr ckpt)
+{
+	char path[MAXPGPATH];
+	int fd;
+	int readBytes;
+	uint32 magic = REPLICATION_STATE_MAGIC;
+	int last_state = 0;
+	pg_crc32 file_crc;
+	pg_crc32 crc;
+
+	/* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+	static bool already_started = false;
+	Assert(!already_started);
+	already_started = true;
+#endif
+
+	if (max_replication_slots == 0)
+		return;
+
+	INIT_CRC32C(crc);
+
+	elog(LOG, "starting up replication identifier with ckpt at %X/%X",
+		 (uint32)(ckpt >> 32), (uint32)ckpt);
+
+	sprintf(path, "pg_logical/checkpoints/%X-%X.ckpt",
+			(uint32)(ckpt >> 32), (uint32)ckpt);
+
+	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+	/*
+	 * might have had max_replication_slots == 0 last run, or we just brought up a
+	 * standby.
+	 */
+	if (fd < 0 && errno == ENOENT)
+		return;
+	else if (fd < 0)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not open replication state checkpoint \"%s\": %m",
+						path)));
+
+	/* verify magic, thats written even if nothing was active */
+	readBytes = read(fd, &magic, sizeof(magic));
+	if (readBytes != sizeof(magic))
+		ereport(PANIC,
+				(errmsg("could not read replication state checkpoint magic \"%s\": %m",
+						path)));
+	COMP_CRC32C(crc, &magic, sizeof(magic));
+
+	if (magic != REPLICATION_STATE_MAGIC)
+		ereport(PANIC,
+				(errmsg("replication checkpoint has wrong magic %u instead of %u",
+						magic, REPLICATION_STATE_MAGIC)));
+
+	/* recover individual states, until there are no more to be found */
+	while (true)
+	{
+		ReplicationStateOnDisk disk_state;
+
+		readBytes = read(fd, &disk_state, sizeof(disk_state));
+
+		/* no further data */
+		if (readBytes == sizeof(crc))
+		{
+			/* not pretty, but simple ... */
+			file_crc = *(pg_crc32*) &disk_state;
+			break;
+		}
+
+		if (readBytes < 0)
+		{
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not read replication checkpoint file \"%s\": %m",
+							path)));
+		}
+
+		if (readBytes != sizeof(disk_state))
+		{
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not read replication checkpoint file \"%s\": read %d of %zu",
+							path, readBytes, sizeof(disk_state))));
+		}
+
+		COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+
+		if (last_state == max_replication_slots)
+			ereport(PANIC,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("no free replication state could be found, increase max_replication_slots")));
+
+		/* copy data to shared memory */
+		ReplicationStates[last_state].local_identifier = disk_state.local_identifier;
+		ReplicationStates[last_state].remote_lsn = disk_state.remote_lsn;
+		last_state++;
+
+		elog(LOG, "recovered replication state of node %u to %X/%X",
+			 disk_state.local_identifier,
+			 (uint32)(disk_state.remote_lsn >> 32),
+			 (uint32)disk_state.remote_lsn);
+	}
+
+	/* now check checksum */
+	FIN_CRC32C(crc);
+	if (file_crc != crc)
+		ereport(PANIC,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u",
+						crc, file_crc)));
+
+	CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and replication_origin_id that ensures we
+ * won't loose knowledge about that after a crash if the the transaction had a
+ * persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ *
+ * Needs to be called with a RowExclusiveLock on pg_replication_identifier,
+ * unless running in recovery.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+							 XLogRecPtr remote_commit,
+							 XLogRecPtr local_commit)
+{
+	int i;
+	int free_slot = -1;
+	ReplicationState *replication_state = NULL;
+
+	Assert(node != InvalidRepNodeId);
+
+	/* we don't track DoNotReplicateRepNodeId */
+	if (node == DoNotReplicateRepNodeId)
+		return;
+
+	/*
+	 * XXX: should we restore into a hashtable and dump into shmem only after
+	 * recovery finished?
+	 */
+
+	/* check whether slot already exists */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *curstate = &ReplicationStates[i];
+
+		/* remember where to insert if necessary */
+		if (curstate->local_identifier == InvalidRepNodeId &&
+			free_slot == -1)
+		{
+			free_slot = i;
+			continue;
+		}
+
+		/* not our slot */
+		if (curstate->local_identifier != node)
+			continue;
+
+		if (curstate->acquired_by != 0)
+		{
+			elog(ERROR, "cannot advance slot that is setup in backend %d",
+				 curstate->acquired_by);
+		}
+
+		/* ok, found slot */
+		replication_state = curstate;
+		break;
+	}
+
+	if (replication_state == NULL && free_slot == -1)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("no free replication state could be found for %u, increase max_replication_slots",
+						node)));
+	/* initialize new slot */
+	else if (replication_state == NULL)
+	{
+		replication_state = &ReplicationStates[free_slot];
+		Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+		replication_state->local_identifier = node;
+	}
+
+	Assert(replication_state->local_identifier != InvalidRepNodeId);
+
+	/*
+	 * Due to - harmless - race conditions during a checkpoint we could see
+	 * values here that are older than the ones we already have in
+	 * memory. Don't overwrite those.
+	 */
+	if (replication_state->remote_lsn < remote_commit)
+		replication_state->remote_lsn = remote_commit;
+	if (replication_state->local_lsn < local_commit)
+		replication_state->local_lsn = local_commit;
+}
+
+/*
+ * Tear down a (possibly) cached replication identifier during process exit.
+ */
+static void
+ReplicationIdentifierExitCleanup(int code, Datum arg)
+{
+	if (local_replication_state != NULL &&
+		local_replication_state->acquired_by == MyProcPid)
+	{
+		local_replication_state->acquired_by = 0;
+		local_replication_state = NULL;
+	}
+}
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only one such cached identifier can exist per process and the
+ * current cached value can only be set again after the previous value is torn
+ * down with TeardownCachedReplicationIdentifier().
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+	static bool registered_cleanup;
+	int		i;
+	int		free_slot = -1;
+
+	if (!registered_cleanup)
+	{
+		on_shmem_exit(ReplicationIdentifierExitCleanup, 0);
+		registered_cleanup = true;
+	}
+
+	Assert(max_replication_slots > 0);
+
+	if (local_replication_state != NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot setup replication origin when one is already setup")));
+
+	LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+
+	/*
+	 * Search for either an existing slot for that identifier or a free one we
+	 * can use.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationState *curstate = &ReplicationStates[i];
+
+		/* remember where to insert if necessary */
+		if (curstate->local_identifier == InvalidRepNodeId &&
+			free_slot == -1)
+		{
+			free_slot = i;
+			continue;
+		}
+
+		/* not our slot */
+		if (curstate->local_identifier != node)
+			continue;
+
+		else if (curstate->acquired_by != 0)
+		{
+			elog(ERROR, "cannot setup slot that is already setup in backend %d",
+				 curstate->acquired_by);
+		}
+
+		local_replication_state = curstate;
+	}
+
+
+	if (local_replication_state == NULL && free_slot == -1)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("no free replication state could be found for %u, increase max_replication_slots",
+						node)));
+	else if (local_replication_state == NULL)
+	{
+		local_replication_state = &ReplicationStates[free_slot];
+		Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(local_replication_state->local_lsn == InvalidXLogRecPtr);
+		local_replication_state->local_identifier = node;
+	}
+
+	Assert(local_replication_state->local_identifier != InvalidRepNodeId);
+
+	local_replication_state->acquired_by = MyProcPid;
+
+	UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock);
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can
+ * be setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was setup with
+ * SetupCachedReplicationIdentifier().
+ */
+void
+TeardownCachedReplicationIdentifier(void)
+{
+	Assert(max_replication_slots != 0);
+
+	if (local_replication_state == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("no replication identifier is set up")));
+
+	local_replication_state->acquired_by = 0;
+	local_replication_state = NULL;
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+								   XLogRecPtr local_commit)
+{
+	Assert(local_replication_state != NULL);
+	Assert(local_replication_state->local_identifier != InvalidRepNodeId);
+
+	if (local_replication_state->local_lsn < local_commit)
+		local_replication_state->local_lsn = local_commit;
+	if (local_replication_state->remote_lsn < remote_commit)
+		local_replication_state->remote_lsn = remote_commit;
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & cached replication identifier.
+ */
+XLogRecPtr
+RemoteCommitFromCachedReplicationIdentifier(void)
+{
+	Assert(local_replication_state != NULL);
+	return local_replication_state->remote_lsn;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 16b9808..e927698 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/replication_identifier.h"
 #include "storage/bufmgr.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
@@ -132,6 +133,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, CheckpointerShmemSize());
 		size = add_size(size, AutoVacuumShmemSize());
 		size = add_size(size, ReplicationSlotsShmemSize());
+		size = add_size(size, ReplicationIdentifierShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, BTreeShmemSize());
@@ -238,6 +240,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	CheckpointerShmemInit();
 	AutoVacuumShmemInit();
 	ReplicationSlotsShmemInit();
+	ReplicationIdentifierShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
 
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index bd27168..fdccb95 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
 #include "catalog/pg_shdepend.h"
 #include "catalog/pg_shdescription.h"
 #include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_identifier.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_ts_config.h"
@@ -620,6 +621,28 @@ static const struct cachedesc cacheinfo[] = {
 		},
 		128
 	},
+	{ReplicationIdentifierRelationId,		/* REPLIDIDENT */
+		ReplicationLocalIdentIndex,
+		1,
+		{
+			Anum_pg_replication_riident,
+			0,
+			0,
+			0
+		},
+		16
+	},
+	{ReplicationIdentifierRelationId,		/* REPLIDREMOTE */
+		ReplicationExternalIdentIndex,
+		1,
+		{
+			Anum_pg_replication_riname,
+			0,
+			0,
+			0
+		},
+		16
+	},
 	{RewriteRelationId,			/* RULERELNAME */
 		RewriteRelRulenameIndexId,
 		2,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9572777..fd2d32f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -58,6 +58,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/logical.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 18614e7..c2a5e15 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -202,6 +202,7 @@ static const char *subdirs[] = {
 	"pg_stat",
 	"pg_stat_tmp",
 	"pg_logical",
+	"pg_logical/checkpoints",
 	"pg_logical/snapshots",
 	"pg_logical/mappings"
 };
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index a16089f..3ae42b8 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -55,6 +55,8 @@
 #include "common/fe_memutils.h"
 #include "storage/large_object.h"
 #include "pg_getopt.h"
+#include "replication/logical.h"
+#include "replication/replication_identifier.h"
 
 
 static ControlFileData ControlFile;		/* pg_control values */
@@ -1088,6 +1090,7 @@ WriteEmptyXLOG(void)
 	record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint);
 	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
 	record->xl_rmid = RM_XLOG_ID;
+	record->xl_origin_id = InvalidRepNodeId;
 	recptr += SizeOfXLogRecord;
 	*(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
 	*(recptr++) = sizeof(CheckPoint);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 8205504..8bc047b 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,10 +146,18 @@ typedef struct xl_xact_commit
 	RelFileNode xnodes[1];		/* VARIABLE LENGTH ARRAY */
 	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
 	/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
+	/* xl_xact_origin follows if xinfo specifies it */
 } xl_xact_commit;
 
 #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
 
+typedef struct xl_xact_origin
+{
+	XLogRecPtr	origin_lsn;
+	RepNodeId	origin_node_id;
+	TimestampTz origin_timestamp;
+} xl_xact_origin;
+
 /*
  * These flags are set in the xinfo fields of WAL commit records,
  * indicating a variety of additional actions that need to occur
@@ -160,7 +168,7 @@ typedef struct xl_xact_commit
  */
 #define XACT_COMPLETION_UPDATE_RELCACHE_FILE	0x01
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT		0x02
-
+#define XACT_CONTAINS_ORIGIN					0x04
 /* Access macros for above flags */
 #define XactCompletionRelcacheInitFileInval(xinfo)	(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
 #define XactCompletionForceSyncCommit(xinfo)		(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 138deaf..f06d11f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -85,6 +85,7 @@ typedef enum
 } RecoveryTargetType;
 
 extern XLogRecPtr XactLastRecEnd;
+extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
 
 extern bool reachedConsistency;
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 6638c1d..bd8dd70 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -45,6 +45,12 @@ typedef uint64 XLogSegNo;
 typedef uint32 TimeLineID;
 
 /*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
+/*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
  *	it is a win to use it in all cases where we sync on each write().  We could
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 74bec20..ef05879 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -125,6 +125,10 @@ struct XLogReaderState
 	uint32		main_data_len;	/* main data portion's length */
 	uint32		main_data_bufsz;	/* allocated size of the buffer */
 
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+	RepNodeId	record_origin;
+#endif
+
 	/* information about blocks referenced by the record. */
 	DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
 
@@ -184,6 +188,11 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
 #define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
 #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
 #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
+#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING
+#define XLogRecGetOrigin(decoder) ((decoder)->decoded_record->xl_origin_id)
+#else
+#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
+#endif
 #define XLogRecGetData(decoder) ((decoder)->main_data)
 #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
 #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 25a9265..048e45f 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -45,7 +45,7 @@ typedef struct XLogRecord
 	XLogRecPtr	xl_prev;		/* ptr to previous record in log */
 	uint8		xl_info;		/* flag bits, see below */
 	RmgrId		xl_rmid;		/* resource manager for this record */
-	/* 2 bytes of padding here, initialize to zero */
+	RepNodeId	xl_origin_id;   /* what node did originally cause this record to be written */
 	pg_crc32	xl_crc;			/* CRC for this record */
 
 	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
@@ -174,5 +174,8 @@ typedef struct XLogRecordDataHeaderLong
 
 #define XLR_BLOCK_ID_DATA_SHORT		255
 #define XLR_BLOCK_ID_DATA_LONG		254
+#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING
+#define XLR_BLOCK_ID_ORIGIN			253
+#endif
 
 #endif   /* XLOGRECORD_H */
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index a680229..405528d 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -305,6 +305,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid
 DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops));
 #define PolicyPolrelidPolnameIndexId				3258
 
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 6001, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdentIndex 6001
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riname_index, 6002, on pg_replication_identifier using btree(riname varchar_pattern_ops));
+#define ReplicationExternalIdentIndex 6002
+
 /* last step of initialization script: build the indexes declared above */
 BUILD_INDICES
 
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 9edfdb8..3765b38 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5143,6 +5143,34 @@ DESCR("rank of hypothetical row without gaps");
 DATA(insert OID = 3993 ( dense_rank_final	PGNSP PGUID 12 1 0 2276 0 f f f f f f i 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_	hypothetical_dense_rank_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 
+/* replication_identifier.h */
+DATA(insert OID = 6003 (  pg_replication_identifier_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_create _null_ _null_ _null_ ));
+DESCR("create local replication identifier for the passed external one");
+
+DATA(insert OID = 6004 (  pg_replication_identifier_get PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_get _null_ _null_ _null_ ));
+DESCR("translate the external node identifier to a local one");
+
+DATA(insert OID = 6005 (  pg_replication_identifier_setup_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_replaying_from _null_ _null_ _null_ ));
+DESCR("setup from which node we are replaying transactions from currently");
+
+DATA(insert OID = 6006 (  pg_replication_identifier_reset_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ pg_replication_identifier_reset_replaying_from _null_ _null_ _null_ ));
+DESCR("reset replay mode");
+
+DATA(insert OID = 6007 (  pg_replication_identifier_setup_tx_origin PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_tx_origin _null_ _null_ _null_ ));
+DESCR("setup transaction timestamp and origin lsn");
+
+DATA(insert OID = 6008 (  pg_get_replication_identifier_progress PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ pg_get_replication_identifier_progress _null_ _null_ _null_ ));
+DESCR("replication identifier progress");
+
+DATA(insert OID = 6009 (  pg_replication_identifier_is_replaying PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_replication_identifier_is_replaying _null_ _null_ _null_ ));
+DESCR("is a replication identifier setup");
+
+DATA(insert OID = 6010 (  pg_replication_identifier_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 3 0 2278 "25 3220 3220" _null_ _null_ _null_ _null_ pg_replication_identifier_advance _null_ _null_ _null_ ));
+DESCR("advance replication itentifier to specific location");
+
+DATA(insert OID = 6011 (  pg_replication_identifier_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_drop _null_ _null_ _null_ ));
+DESCR("drop existing replication identifier");
+
 
 /*
  * Symbolic values for provolatile column: these indicate whether the result
diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h
new file mode 100644
index 0000000..26eec17
--- /dev/null
+++ b/src/include/catalog/pg_replication_identifier.h
@@ -0,0 +1,75 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ *	  Persistent Replication Node Identifiers
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ *	  the genbki.pl script reads this file and generates .bki
+ *	  information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ *		pg_replication_identifier.  cpp turns this into
+ *		typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 6000
+
+CATALOG(pg_replication_identifier,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+	/*
+	 * locally known identifier that gets included into wal.
+	 *
+	 * This should never leave the system.
+	 *
+	 * Needs to fit into a uint16, so we don't waste too much space in WAL
+	 * records. For this reason we don't use a normal Oid column here, since
+	 * we need to handle allocation of new values manually.
+	 */
+	Oid		riident;
+
+	/*
+	 * Variable-length fields start here, but we allow direct access to
+	 * riname.
+	 */
+
+	/* external, free-format, identifier */
+	text	riname;
+#ifdef CATALOG_VARLEN		/* further variable-length fields */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ *		Form_pg_extension corresponds to a pointer to a tuple with
+ *		the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ *		compiler constants for pg_replication_identifier
+ * ----------------
+ */
+
+#define Natts_pg_replication_identifier		2
+#define Anum_pg_replication_riident			1
+#define Anum_pg_replication_riname			2
+
+/* ----------------
+ *		pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif   /* PG_REPLICTION_IDENTIFIER_H */
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 5cfc0ae..c787523 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -265,6 +265,12 @@
 #endif
 
 /*
+ * Temporary switch to change between using xlog padding or a separate block
+ * id in the record to record the xlog origin of a record.
+ */
+/* #define REPLICATION_IDENTIFIER_REUSE_PADDING */
+
+/*
  * Define this to cause palloc()'d memory to be filled with random data, to
  * facilitate catching code that depends on the contents of uninitialized
  * memory.  Caution: this is horrendously expensive.
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 0935c1b..26095b1 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,13 @@ typedef void (*LogicalDecodeCommitCB) (
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Filter changes by origin.
+ */
+typedef bool (*LogicalDecodeFilterByOriginCB) (
+											 struct LogicalDecodingContext *,
+												   RepNodeId origin_id);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (
@@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5a1d9a0..784abd6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -64,6 +64,8 @@ typedef struct ReorderBufferChange
 	/* The type of change. */
 	enum ReorderBufferChangeType action;
 
+	RepNodeId origin_id;
+
 	/*
 	 * Context data for the change, which part of the union is valid depends
 	 * on action/action_internal.
@@ -162,6 +164,10 @@ typedef struct ReorderBufferTXN
 	 */
 	XLogRecPtr	restart_decoding_lsn;
 
+	/* origin of the change that caused this transaction */
+	RepNodeId origin_id;
+	XLogRecPtr origin_lsn;
+
 	/*
 	 * Commit time, only known when we read the actual commit record.
 	 */
@@ -335,7 +341,7 @@ void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time);
+					TimestampTz commit_time, RepNodeId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h
new file mode 100644
index 0000000..36d74aa
--- /dev/null
+++ b/src/include/replication/replication_identifier.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ *     XXX
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+#define InvalidRepNodeId 0
+#define DoNotReplicateRepNodeId USHRT_MAX
+
+extern PGDLLIMPORT RepNodeId replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+/* API for querying & manipulating replication identifiers */
+extern RepNodeId GetReplicationIdentifier(char *name, bool missing_ok);
+extern RepNodeId CreateReplicationIdentifier(char *name);
+extern void GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok,
+										   char **riname);
+extern void DropReplicationIdentifier(RepNodeId riident);
+
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+										 XLogRecPtr remote_commit,
+										 XLogRecPtr local_commit);
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+											   XLogRecPtr local_commit);
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(void);
+extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
+
+/* crash recovery support */
+extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt);
+extern void TruncateReplicationIdentifier(XLogRecPtr cutoff);
+extern void StartupReplicationIdentifier(XLogRecPtr ckpt);
+
+/* internals */
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+/* SQL callable functions */
+extern Datum pg_replication_identifier_get(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_create(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_drop(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_is_replaying(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS);
+extern Datum pg_get_replication_identifier_progress(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_advance(PG_FUNCTION_ARGS);
+
+#endif
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index ba0b090..d7be45a 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
 	RANGETYPE,
 	RELNAMENSP,
 	RELOID,
+	REPLIDIDENT,
+	REPLIDREMOTE,
 	RULERELNAME,
 	STATRELATTINH,
 	TABLESPACEOID,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index d50b103..5609503 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1390,6 +1390,11 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
      LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_identifier_progress| SELECT pg_get_replication_identifier_progress.local_id,
+    pg_get_replication_identifier_progress.external_id,
+    pg_get_replication_identifier_progress.remote_lsn,
+    pg_get_replication_identifier_progress.local_lsn
+   FROM pg_get_replication_identifier_progress() pg_get_replication_identifier_progress(local_id, external_id, remote_lsn, local_lsn);
 pg_replication_slots| SELECT l.slot_name,
     l.plugin,
     l.slot_type,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index c7be273..400cba3 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -121,6 +121,7 @@ pg_pltemplate|t
 pg_policy|t
 pg_proc|t
 pg_range|t
+pg_replication_identifier|t
 pg_rewrite|t
 pg_seclabel|t
 pg_shdepend|t
-- 
2.0.0.rc2.4.g1dc51c6.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to