On Sat, 2011-02-12 at 14:11 -0800, Daniel Farina wrote:
> This is another bit of the syncrep patch split out.
> 
> I will revisit the replication timeout one Real Soon, I promise -- but
> I have a couple things to do today that may delay that until the
> evening.
> 
> https://github.com/fdr/postgres/commit/ad3ce9ac62f0e128d7d1fd20d47184f867056af1
> 
> Context diff supplied here.

Greg just tipped me off to this thread a few hours ago. I saw your other
work on timeouts which looks good.

I've reworked this feature myself, and its roughly the same thing you
have posted, so I will just add on to this thread. The major change from
my earlier patch is that the logic around setting xmin on the master is
considerably tighter, and correctly uses locking.

Patch attached, no docs yet, but the patch is clear.

I'm looking to commit this in next 24 hours barring objections and/or
test failures.

> Note that this information is not exposed via catalog in the original
> syncrep patch, and is not here. Do we want that kind of reporting?

Probably, but its a small change and will conflict with other work for
now.

-- 
 Simon Riggs           http://www.2ndQuadrant.com/books/
 PostgreSQL Development, 24x7 Support, Training and Services
 
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 3277da8..c4ebf97 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -45,6 +45,7 @@
 #include "replication/walreceiver.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
+#include "storage/procarray.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -56,6 +57,7 @@ bool		am_walreceiver;
 
 /* GUC variable */
 int			wal_receiver_status_interval;
+bool		hot_standby_feedback;
 
 /* libpqreceiver hooks to these when loaded */
 walrcv_connect_type walrcv_connect = NULL;
@@ -610,10 +612,14 @@ XLogWalRcvSendReply(void)
 	reply_message.apply = GetXLogReplayRecPtr();
 	reply_message.sendTime = now;
 
-	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+	if (hot_standby_feedback)
+		reply_message.xmin = GetOldestXmin(true, false);
+
+	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u",
 				 reply_message.write.xlogid, reply_message.write.xrecoff,
 				 reply_message.flush.xlogid, reply_message.flush.xrecoff,
-				 reply_message.apply.xlogid, reply_message.apply.xrecoff);
+				 reply_message.apply.xlogid, reply_message.apply.xrecoff,
+				 reply_message.xmin);
 
 	/* Prepend with the message type and send it. */
 	buf[0] = 'r';
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ad95b4..36eb3a9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -53,6 +53,7 @@
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -498,6 +499,7 @@ ProcessStandbyReplyMessage(void)
 	static StringInfoData input_message;
 	StandbyReplyMessage	reply;
 	char msgtype;
+	TransactionId newxmin = InvalidTransactionId;
 
 	initStringInfo(&input_message);
 
@@ -524,10 +526,11 @@ ProcessStandbyReplyMessage(void)
 
 	pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage));
 
-	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
+	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u",
 		 reply.write.xlogid, reply.write.xrecoff,
 		 reply.flush.xlogid, reply.flush.xrecoff,
-		 reply.apply.xlogid, reply.apply.xrecoff);
+		 reply.apply.xlogid, reply.apply.xrecoff,
+		 reply.xmin);
 
 	/*
 	 * Update shared state for this WalSender process
@@ -543,6 +546,74 @@ ProcessStandbyReplyMessage(void)
 		walsnd->apply = reply.apply;
 		SpinLockRelease(&walsnd->mutex);
 	}
+
+	/*
+	 * Update the WalSender's proc xmin to allow it to be visible
+	 * to snapshots. This will hold back the removal of dead rows
+	 * and thereby prevent the generation of cleanup conflicts
+	 * on the standby server.
+	 */
+	if (TransactionIdIsValid(reply.xmin))
+	{
+		TransactionId safeLimit;
+#define HOT_STANDBY_FEEDBACK_HORIZON		 1000000000
+
+		if (!TransactionIdIsValid(MyProc->xmin))
+		{
+			/*
+			 * Initialise MyProc->xmin from standby feedback.
+			 * Don't allow use oldestXmin if it is too far back.
+			 */
+			safeLimit = ReadNewTransactionId() - HOT_STANDBY_FEEDBACK_HORIZON;
+			if (!TransactionIdIsNormal(safeLimit))
+				safeLimit = FirstNormalTransactionId;
+
+			if (TransactionIdPrecedes(safeLimit, reply.xmin))
+			{
+				TransactionId oldestXmin = GetOldestXmin(true, true);
+
+				if (TransactionIdPrecedes(oldestXmin, reply.xmin))
+					newxmin = reply.xmin;
+				else
+					newxmin = oldestXmin;
+			}
+			else
+				ereport(WARNING,
+						(errmsg("standby xmin is far in the past"),
+						 errhint("standby must catch up before hot standby feedback is effective.")));
+		}
+		else
+		{
+			/*
+			 * Feedback from standby should move us forwards, but not too far.
+			 * Avoid grabbing XidGenLock here in case of waits, so use
+			 * a heuristic instead.
+			 */
+			Assert(TransactionIdIsValid(MyProc->xmin));
+			safeLimit = MyProc->xmin + HOT_STANDBY_FEEDBACK_HORIZON;
+			if (!TransactionIdIsNormal(safeLimit))
+				safeLimit = FirstNormalTransactionId;
+
+			if (TransactionIdPrecedes(safeLimit, reply.xmin))
+			{
+				ereport(WARNING,
+						(errmsg("standby xmin is far in the past"),
+						 errhint("standby must catch up before hot standby feedback is effective.")));
+			}
+			else
+			{
+				if (TransactionIdPrecedes(MyProc->xmin, reply.xmin))
+					newxmin = reply.xmin;
+			}
+		}
+	}
+
+	/*
+	 * Grab the ProcArrayLock to set xmin
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	MyProc->xmin = newxmin;
+	LWLockRelease(ProcArrayLock);
 }
 
 /* Main loop of walsender process */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 470183d..08cf941 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1279,6 +1279,15 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"hot_standby_feedback", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+			gettext_noop("Allows feedback from a hot standby primary that will avoid query conflicts."),
+			NULL
+		},
+		&hot_standby_feedback,
+		false, NULL, NULL
+	},
+
+	{
 		{"allow_system_table_mods", PGC_POSTMASTER, DEVELOPER_OPTIONS,
 			gettext_noop("Allows modifications of the structure of system tables."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5d31365..3ef6813 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -196,6 +196,7 @@
 
 #hot_standby = off			# "on" allows queries during recovery
 					# (change requires restart)
+#hot_standby_feedback = off	# info from standby to prevent query conflicts
 #max_standby_archive_delay = 30s	# max delay before canceling queries
 					# when reading WAL from archive;
 					# -1 allows indefinite delay
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index 32c4962..8e4e7d0 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -56,6 +56,13 @@ typedef struct
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
 
+	/*
+	 * The current xmin from the standby, for Hot Standby feedback.
+	 * This may be invalid if the standby-side does not support feedback,
+	 * or Hot Standby is not yet available.
+	 */
+	TransactionId	xmin;
+
 	/* Sender's system clock at the time of transmission */
 	TimestampTz sendTime;
 } StandbyReplyMessage;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index aa5bfb7..9137b86 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -18,6 +18,7 @@
 
 extern bool am_walreceiver;
 extern int wal_receiver_status_interval;
+extern bool hot_standby_feedback;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to