The previous mail contained a patch with a mismerge caused by reording 
commits. Corrected version attached.

Thanks to Steve Singer for noticing this quickly.

Andres

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From efb28b1f931a30738faac83703810b598a82a6ee Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sat, 9 Jun 2012 14:49:34 +0200
Subject: [PATCH] Introduce the concept that wal has a 'origin' node

One solution to avoid loops when doing wal based logical replication in
topologies which are more complex than one unidirectional transport is
introducing the concept of a 'origin_id' into the wal stream. Luckily there is
some padding in the XLogRecord struct that allows us to add that field without
further bloating the struct.
This solution was chosen because it allows for just about any topology and is
inobstrusive.

This adds a new configuration parameter multimaster_node_id which determines
the id used for wal originating in one cluster.

When applying changes from wal from another cluster code can set the variable
current_replication_origin_id. This is a global variable because passing it
through everything which can generate wal would be far to intrusive.
---
 src/backend/access/transam/xact.c             |   54 +++++++++++++------------
 src/backend/access/transam/xlog.c             |    3 +-
 src/backend/access/transam/xlogreader.c       |    2 +
 src/backend/replication/logical/Makefile      |    2 +-
 src/backend/replication/logical/logical.c     |   19 +++++++++
 src/backend/utils/misc/guc.c                  |   19 +++++++++
 src/backend/utils/misc/postgresql.conf.sample |    3 ++
 src/include/access/xlog.h                     |    4 +-
 src/include/access/xlogdefs.h                 |    2 +
 src/include/replication/logical.h             |   22 ++++++++++
 10 files changed, 100 insertions(+), 30 deletions(-)
 create mode 100644 src/backend/replication/logical/logical.c
 create mode 100644 src/include/replication/logical.h

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3cc2bfa..79ec19a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,8 +36,9 @@
 #include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "pgstat.h"
-#include "replication/walsender.h"
+#include "replication/logical.h"
 #include "replication/syncrep.h"
+#include "replication/walsender.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
@@ -4545,12 +4546,13 @@ xactGetCommittedChildren(TransactionId **ptr)
  * actions for which the order of execution is critical.
  */
 static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
-						  TransactionId *sub_xids, int nsubxacts,
-						  SharedInvalidationMessage *inval_msgs, int nmsgs,
-						  RelFileNode *xnodes, int nrels,
-						  Oid dbId, Oid tsId,
-						  uint32 xinfo)
+xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
+                          XLogRecPtr lsn, XLogRecPtr origin_lsn,
+                          TransactionId *sub_xids, int nsubxacts,
+                          SharedInvalidationMessage *inval_msgs, int nmsgs,
+                          RelFileNode *xnodes, int nrels,
+                          Oid dbId, Oid tsId,
+                          uint32 xinfo)
 {
 	TransactionId max_xid;
 	int			i;
@@ -4659,8 +4661,8 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
  * Utility function to call xact_redo_commit_internal after breaking down xlrec
  */
 static void
-xact_redo_commit(xl_xact_commit *xlrec,
-				 TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
+							TransactionId xid, XLogRecPtr lsn)
 {
 	TransactionId *subxacts;
 	SharedInvalidationMessage *inval_msgs;
@@ -4670,27 +4672,26 @@ xact_redo_commit(xl_xact_commit *xlrec,
 	/* invalidation messages array follows subxids */
 	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
-	xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
-							  inval_msgs, xlrec->nmsgs,
-							  xlrec->xnodes, xlrec->nrels,
-							  xlrec->dbId,
-							  xlrec->tsId,
-							  xlrec->xinfo);
+	xact_redo_commit_internal(xid, originating_node, lsn, xlrec->origin_lsn,
+	                          subxacts, xlrec->nsubxacts, inval_msgs,
+	                          xlrec->nmsgs, xlrec->xnodes, xlrec->nrels,
+	                          xlrec->dbId, xlrec->tsId, xlrec->xinfo);
 }
 
 /*
  * Utility function to call xact_redo_commit_internal  for compact form of message.
  */
 static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
-						 TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
+							TransactionId xid, XLogRecPtr lsn)
 {
-	xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
-							  NULL, 0,	/* inval msgs */
-							  NULL, 0,	/* relfilenodes */
-							  InvalidOid,		/* dbId */
-							  InvalidOid,		/* tsId */
-							  0);		/* xinfo */
+	xact_redo_commit_internal(xid, originating_node, lsn, zeroRecPtr, xlrec->subxacts,
+								xlrec->nsubxacts,
+								NULL, 0,		/* inval msgs */
+								NULL, 0,		/* relfilenodes */
+								InvalidOid,		/* dbId */
+								InvalidOid,		/* tsId */
+								0);				/* xinfo */
 }
 
 /*
@@ -4786,17 +4787,18 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
 	/* Backup blocks are not used in xact records */
 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
+	/* FIXME: we probably shouldn't pass xl_origin_id at multiple places, hm */
 	if (info == XLOG_XACT_COMMIT_COMPACT)
 	{
 		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
 
-		xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+		xact_redo_commit_compact(xlrec, record->xl_origin_id, record->xl_xid, lsn);
 	}
 	else if (info == XLOG_XACT_COMMIT)
 	{
 		xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
-		xact_redo_commit(xlrec, record->xl_xid, lsn);
+		xact_redo_commit(xlrec, record->xl_origin_id, record->xl_xid, lsn);
 	}
 	else if (info == XLOG_XACT_ABORT)
 	{
@@ -4814,7 +4816,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
 	{
 		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
 
-		xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+		xact_redo_commit(&xlrec->crec, record->xl_origin_id, xlrec->xid, lsn);
 		RemoveTwoPhaseFile(xlrec->xid, false);
 	}
 	else if (info == XLOG_XACT_ABORT_PREPARED)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c6feed0..504b4d0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/logical.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -1032,7 +1033,7 @@ begin:;
 	record->xl_len = len;		/* doesn't include backup blocks */
 	record->xl_info = info;
 	record->xl_rmid = rmid;
-
+	record->xl_origin_id = current_replication_origin_id;
 	/* Now we can finish computing the record's CRC */
 	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
 			   SizeOfXLogRecord - sizeof(pg_crc32));
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 6f15d66..bacd31e 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -24,6 +24,7 @@
 #include "access/xlogreader.h"
 
 /* FIXME */
+#include "replication/logical.h" /* InvalidMultimasterNodeId */
 #include "replication/walsender_private.h"
 #include "replication/walprotocol.h"
 
@@ -563,6 +564,7 @@ XLogReaderRead(XLogReaderState* state)
 				spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
 				spacer.xl_rmid = RM_XLOG_ID;
 				spacer.xl_info = XLOG_NOOP;
+				spacer.xl_origin_id = InvalidMultimasterNodeId;
 
 				state->writeout_data(state,
 				                     (char*)&spacer,
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 7dd9663..c2d6d82 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = applycache.o decode.o
+OBJS = applycache.o decode.o logical.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
new file mode 100644
index 0000000..4f34488
--- /dev/null
+++ b/src/backend/replication/logical/logical.c
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+#include "replication/logical.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;
+RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
+XLogRecPtr current_replication_origin_lsn = {0, 0};
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 93c798b..46b0657 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -60,6 +60,7 @@
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "replication/logical.h"
 #include "storage/bufmgr.h"
 #include "storage/standby.h"
 #include "storage/fd.h"
@@ -198,6 +199,7 @@ static const char *show_tcp_keepalives_interval(void);
 static const char *show_tcp_keepalives_count(void);
 static bool check_maxconnections(int *newval, void **extra, GucSource source);
 static void assign_maxconnections(int newval, void *extra);
+static void assign_replication_node_id(int newval, void *extra);
 static bool check_maxworkers(int *newval, void **extra, GucSource source);
 static void assign_maxworkers(int newval, void *extra);
 static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
@@ -1598,6 +1600,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"multimaster_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+			gettext_noop("node id for multimaster."),
+			NULL
+		},
+		&guc_replication_origin_id,
+		InvalidMultimasterNodeId, InvalidMultimasterNodeId, MaxMultimasterNodeId,
+		NULL, assign_replication_node_id, NULL
+	},
+
+	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
 			NULL
@@ -8629,6 +8641,13 @@ assign_maxconnections(int newval, void *extra)
 	MaxBackends = newval + MaxWorkers + autovacuum_max_workers + 1;
 }
 
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+	guc_replication_origin_id = newval;
+	current_replication_origin_id = newval;
+}
+
 static bool
 check_maxworkers(int *newval, void **extra, GucSource source)
 {
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ce3fc08..12f8a3f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -241,6 +241,9 @@
 #hot_standby_feedback = off		# send info from standby to prevent
 					# query conflicts
 
+# - Multi Master Servers -
+
+#multimaster_node_id = 0 #invalid node id
 
 #------------------------------------------------------------------------------
 # QUERY TUNING
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2843aca..dd89cff 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -47,8 +47,8 @@ typedef struct XLogRecord
 	uint32		xl_len;			/* total len of rmgr data */
 	uint8		xl_info;		/* flag bits, see below */
 	RmgrId		xl_rmid;		/* resource manager for this record */
-
-	/* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+	RepNodeId   xl_origin_id;   /* what node did originally cause this record to be written */
+	/* Depending on MAXALIGN, there are either 0 or 4 wasted bytes here */
 
 	/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 2768427..6b6700a 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -84,6 +84,8 @@ extern XLogRecPtr zeroRecPtr;
  */
 typedef uint32 TimeLineID;
 
+typedef uint16 RepNodeId;
+
 /*
  *	Because O_DIRECT bypasses the kernel buffers, and because we never
  *	read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644
index 0000000..0698b61
--- /dev/null
+++ b/src/include/replication/logical.h
@@ -0,0 +1,22 @@
+/*
+ * logical.h
+ *
+ * PostgreSQL logical replication support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/logical.h
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlogdefs.h"
+
+extern int guc_replication_origin_id;
+extern RepNodeId current_replication_origin_id;
+extern XLogRecPtr current_replication_origin_lsn;
+
+#define InvalidMultimasterNodeId 0
+#define MaxMultimasterNodeId (2<<3)
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to