Hi all,

Please find attached a patch to add support of synchronous replication
for multiple standby servers. This is controlled by the addition of a
new GUC parameter called synchronous_standby_num, that makes server
wait for transaction commit on the first N standbys defined in
synchronous_standby_names. The implementation is really
straight-forward, and has just needed a couple of modifications in
walsender.c for pg_stat_get_wal_senders and syncrep.c.

When a process commit is cancelled manually by user or when
ProcDiePending shows up, the message returned to user does not show
the list of walsenders where the commit has not been confirmed as it
partially confirmed. I have not done anything for that but let me know
if that would be useful. This would need a scan of the walsenders to
get their application_name.

Thanks,
-- 
Michael
From 3dfff90032c38daba43e1e0c4d3221053d6386ac Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Sat, 9 Aug 2014 14:49:24 +0900
Subject: [PATCH] Add parameter synchronous_standby_num

This makes possible support of synchronous replication on a number of standby
nodes equal to the new parameter. The synchronous standbys are chosen in the
order they are listed in synchronous_standby_names.
---
 doc/src/sgml/config.sgml            | 32 ++++++++++++---
 doc/src/sgml/high-availability.sgml | 18 ++++-----
 src/backend/replication/syncrep.c   | 81 ++++++++++++++++++++++++++++++-------
 src/backend/replication/walsender.c | 74 ++++++++++++++++++++++++++++-----
 src/backend/utils/misc/guc.c        | 10 +++++
 src/include/replication/syncrep.h   |  1 +
 6 files changed, 175 insertions(+), 41 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index be5c25b..c40de16 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2586,12 +2586,13 @@ include_dir 'conf.d'
         Specifies a comma-separated list of standby names that can support
         <firstterm>synchronous replication</>, as described in
         <xref linkend="synchronous-replication">.
-        At any one time there will be at most one active synchronous standby;
-        transactions waiting for commit will be allowed to proceed after
-        this standby server confirms receipt of their data.
-        The synchronous standby will be the first standby named in this list
-        that is both currently connected and streaming data in real-time
-        (as shown by a state of <literal>streaming</literal> in the
+        At any one time there will be at a number of active synchronous standbys
+        defined by <varname>synchronous_standby_num</>; transactions waiting
+        for commit will be allowed to proceed after those standby servers
+        confirms receipt of their data. The synchronous standbys will be
+        the first entries named in this list that are both currently connected
+        and streaming data in real-time (as shown by a state of
+        <literal>streaming</literal> in the
         <link linkend="monitoring-stats-views-table">
         <literal>pg_stat_replication</></link> view).
         Other standby servers appearing later in this list represent potential
@@ -2627,6 +2628,25 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num">
+      <term><varname>synchronous_standby_num</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>synchronous_standby_num</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the number of standbys that support
+        <firstterm>synchronous replication</>, as described in
+        <xref linkend="synchronous-replication">, and listed as the first
+        elements of <xref linkend="guc-synchronous-standby-names">.
+       </para>
+       <para>
+        Default value is 1.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
       <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index d249959..085d51b 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1081,12 +1081,12 @@ primary_slot_name = 'node_a_slot'
     WAL record is then sent to the standby. The standby sends reply
     messages each time a new batch of WAL data is written to disk, unless
     <varname>wal_receiver_status_interval</> is set to zero on the standby.
-    If the standby is the first matching standby, as specified in
-    <varname>synchronous_standby_names</> on the primary, the reply
-    messages from that standby will be used to wake users waiting for
-    confirmation that the commit record has been received. These parameters
-    allow the administrator to specify which standby servers should be
-    synchronous standbys. Note that the configuration of synchronous
+    If the standby is the first <varname>synchronous_standby_num</> matching
+    standbys, as specified in <varname>synchronous_standby_names</> on the
+    primary, the reply messages from that standby will be used to wake users
+    waiting for confirmation that the commit record has been received. These
+    parameters allow the administrator to specify which standby servers should
+    be synchronous standbys. Note that the configuration of synchronous
     replication is mainly on the master. Named standbys must be directly
     connected to the master; the master knows nothing about downstream
     standby servers using cascaded replication.
@@ -1169,9 +1169,9 @@ primary_slot_name = 'node_a_slot'
     The best solution for avoiding data loss is to ensure you don't lose
     your last remaining synchronous standby. This can be achieved by naming multiple
     potential synchronous standbys using <varname>synchronous_standby_names</>.
-    The first named standby will be used as the synchronous standby. Standbys
-    listed after this will take over the role of synchronous standby if the
-    first one should fail.
+    The first <varname>synchronous_standby_num</> named standbys will be used as
+    the synchronous standbys. Standbys listed after this will take over the role
+    of synchronous standby if the first one should fail.
    </para>
 
    <para>
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..524ff6c 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -59,6 +59,7 @@
 
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
+int			synchronous_standby_num = 1;
 
 #define SyncStandbysDefined() \
 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
@@ -206,7 +207,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 			ereport(WARNING,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
 			whereToSendOutput = DestNone;
 			SyncRepCancelWait();
 			break;
@@ -223,7 +224,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 			QueryCancelPending = false;
 			ereport(WARNING,
 					(errmsg("canceling wait for synchronous replication due to user request"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
 			SyncRepCancelWait();
 			break;
 		}
@@ -368,11 +369,15 @@ void
 SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	volatile WalSnd *syncWalSnd = NULL;
+	volatile WalSnd *syncWalSnd[synchronous_standby_num];
 	int			numwrite = 0;
 	int			numflush = 0;
 	int			priority = 0;
+	int			num_sync = 0;
 	int			i;
+	bool		found = false;
+
+	syncWalSnd[0] = NULL;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,7 +393,7 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
+	 * then we use the first mentioned standbys. If you change this, also
 	 * change pg_stat_get_wal_senders().
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -398,33 +403,79 @@ SyncRepReleaseWaiters(void)
 		/* use volatile pointer to prevent code rearrangement */
 		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
 
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
+		/* Leave if not streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Leave if asynchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Leave if priority conditions not satisfied */
+		if (priority != 0 &&
+			priority <= walsnd->sync_standby_priority &&
+			num_sync == synchronous_standby_num)
+			continue;
+
+		/* Leave if invalid flush position */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * We have a potential synchronous candidate, add it to the
+		 * list of nodes already present or evict the node with highest
+		 * priority found until now.
+		 */
+
+		if (num_sync == synchronous_standby_num)
+		{
+			int j;
+
+			for (j = 0; j < num_sync; j++)
+			{
+				if (syncWalSnd[j]->sync_standby_priority == priority)
+				{
+					syncWalSnd[j] = walsnd;
+					break;
+				}
+			}
+		}
+		else
 		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
+			syncWalSnd[num_sync] = walsnd;
+			num_sync++;
 		}
+
+		/* Update priority for next tracking */
+		priority = walsnd->sync_standby_priority;
 	}
 
 	/*
 	 * We should have found ourselves at least.
 	 */
-	Assert(syncWalSnd);
+	Assert(syncWalSnd[0]);
 
 	/*
-	 * If we aren't managing the highest priority standby then just leave.
+	 * If we aren't managing one of the highest priority standby then just leave.
 	 */
-	if (syncWalSnd != MyWalSnd)
+	for (i = 0; i < num_sync; i++)
+	{
+		if (syncWalSnd[i] == MyWalSnd)
+		{
+			found = true;
+			break;
+		}
+	}
+
+	/* We are definitely not one of the chosen... */
+	if (!found)
 	{
 		LWLockRelease(SyncRepLock);
 		announce_next_takeover = true;
 		return;
 	}
 
+
 	/*
 	 * Set the lsn first so that when we wake backends they will release up to
 	 * this location.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3189793..8c74c86 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2734,9 +2734,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContext oldcontext;
 	int		   *sync_priority;
 	int			priority = 0;
-	int			sync_standby = -1;
+	int			sync_standbys[max_wal_senders];
+	int			num_sync = 0;
 	int			i;
 
+	sync_standbys[0] = -1;
+
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 		ereport(ERROR,
@@ -2784,15 +2787,50 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
 				0 : walsnd->sync_standby_priority;
 
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
+			/* Leave if not streaming */
+			if (walsnd->state != WALSNDSTATE_STREAMING)
+				continue;
+
+			/* Leave if asynchronous */
+			if (walsnd->sync_standby_priority == 0)
+				continue;
+
+			/* Leave if priority conditions not satisfied */
+			if (priority != 0 &&
+				priority <= walsnd->sync_standby_priority &&
+				num_sync == synchronous_standby_num)
+				continue;
+
+			/* Leave if invalid flush position */
+			if (XLogRecPtrIsInvalid(walsnd->flush))
+				continue;
+
+			/*
+			 * We have a potential synchronous candidate, add it to the
+			 * list of nodes already present or evict the node with highest
+			 * priority found until now.
+			 */
+			if (num_sync == synchronous_standby_num)
+			{
+				int j;
+				for (j = 0; j < num_sync; j++)
+				{
+					volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]];
+					if (walsndloc->sync_standby_priority == priority)
+					{
+						sync_standbys[j] = i;
+						break;
+					}
+				}
+			}
+			else
 			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
+				sync_standbys[num_sync] = i;
+				num_sync++;
 			}
+
+			/* Update priority for next tracking */
+			priority = walsnd->sync_standby_priority;
 		}
 	}
 	LWLockRelease(SyncRepLock);
@@ -2856,10 +2894,24 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			if (sync_priority[i] == 0)
 				values[7] = CStringGetTextDatum("async");
-			else if (i == sync_standby)
-				values[7] = CStringGetTextDatum("sync");
 			else
-				values[7] = CStringGetTextDatum("potential");
+			{
+				int j;
+				bool found = false;
+
+				for (j = 0; j < num_sync; j++)
+				{
+					/* Found that this node is one in sync */
+					if (i == sync_standbys[j])
+					{
+						values[7] = CStringGetTextDatum("sync");
+						found = true;
+						break;
+					}
+				}
+				if (!found)
+					values[7] = CStringGetTextDatum("potential");
+			}
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..73523db 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2551,6 +2551,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER,
+			gettext_noop("Number of potential synchronous standbys."),
+			NULL
+		},
+		&synchronous_standby_num,
+		1, 1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..da1cf7c 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -33,6 +33,7 @@
 
 /* user-settable parameters for synchronous replication */
 extern char *SyncRepStandbyNames;
+extern int	synchronous_standby_num;
 
 /* called by user backend */
 extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
-- 
2.0.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to