On Sat, Sep 20, 2014 at 1:16 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Fri, Sep 19, 2014 at 12:18 PM, Robert Haas <robertmh...@gmail.com> wrote:
>> On Tue, Sep 16, 2014 at 2:19 PM, Michael Paquier
>> <michael.paqu...@gmail.com> wrote:
>>> - A patch refactoring code for pg_stat_get_wal_senders and
>>> SyncRepReleaseWaiters as there is in either case duplicated code in
>>> this area to select the synchronous node as the one connected with
>>> lowest priority
>>
>> A strong +1 for this idea.  I have never liked that, and cleaning it
>> up seems eminently sensible.
>
> Interestingly, the syncrep code has in some of its code paths the idea
> that a synchronous node is unique, while other code paths assume that
> there can be multiple synchronous nodes. If that's fine I think that
> it would be better to just make the code multiple-sync node aware, by
> having a single function call in walsender.c and syncrep.c that
> returns an integer array of WAL sender positions (WalSndCtl). as that
> seems more extensible long-term. Well for now the array would have a
> single element, being the WAL sender with lowest priority > 0. Feel
> free to protest about that approach though :)
Please find attached a patch refactoring this code. Looking once again
at that I have taken the approach minimizing the footprint of
refactoring on current code, by simply adding a function called
SyncRepGetSynchronousNode in syncrep.c that returns to the caller a
position in the WAL sender array to define the code considered as
synchronous, and if no synchronous node is found.

I'll add it to the next commit fest.

Regards,
-- 
Michael
From 6f66020b3c5fc0866b63bb12cf12c582be14f7d0 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Tue, 23 Sep 2014 22:57:00 +0900
Subject: [PATCH] Refactor code to detect synchronous node in WAL sender array

This patch is made to remove code duplication in walsender.c and syncrep.c
in order to detect what is the node with the lowest strictly-positive
priority, facilitating maintenance of this code.
---
 src/backend/replication/syncrep.c   | 89 ++++++++++++++++++++++++++-----------
 src/backend/replication/walsender.c | 34 +++++---------
 src/include/replication/syncrep.h   |  1 +
 3 files changed, 74 insertions(+), 50 deletions(-)

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..e0b1034 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -357,6 +357,60 @@ SyncRepInitConfig(void)
 	}
 }
 
+
+/*
+ * Obtain position of synchronous standby in the array referencing all
+ * the WAL senders, or -1 if no synchronous node can be found. The caller
+ * of this function should take a lock on SyncRepLock.
+ */
+int
+SyncRepGetSynchronousNode(void)
+{
+	int		sync_node = -1;
+	int		priority = 0;
+	int		i;
+
+	/* Scan WAL senders and find synchronous node if any */
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		/* Use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/* Process to next if not active */
+		if (walsnd->pid == 0)
+			continue;
+
+		/* Process to next if not streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Process to next one if asynchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Process to next one if priority conditions not satisfied */
+		if (priority != 0 &&
+			priority <= walsnd->sync_standby_priority)
+			continue;
+
+		/* Process to next one if flush position is invalid */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * We have a potential synchronous candidate, choose it as the
+		 * synchronous node for the time being before going through the
+		 * other nodes listed in the WAL sender array.
+		 */
+		sync_node = i;
+
+		/* Update priority to current value of WAL sender */
+		priority = walsnd->sync_standby_priority;
+	}
+
+	return sync_node;
+}
+
 /*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
@@ -369,10 +423,9 @@ SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	volatile WalSnd *syncWalSnd = NULL;
+	int			sync_node;
 	int			numwrite = 0;
 	int			numflush = 0;
-	int			priority = 0;
-	int			i;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,32 +441,14 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
-	 * change pg_stat_get_wal_senders().
+	 * then we use the first mentioned standbys.
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	sync_node = SyncRepGetSynchronousNode();
 
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
-		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
-		}
-	}
-
-	/*
-	 * We should have found ourselves at least.
-	 */
-	Assert(syncWalSnd);
+	/* We should have found ourselves at least */
+	Assert(sync_node >= 0 && sync_node < max_wal_senders);
+	syncWalSnd = &WalSndCtl->walsnds[sync_node];
 
 	/*
 	 * If we aren't managing the highest priority standby then just leave.
@@ -444,7 +479,7 @@ SyncRepReleaseWaiters(void)
 
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
-	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
 
 	/*
 	 * If we are managing the highest priority standby, though we weren't
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 384c9b6..bc2e40d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2742,8 +2742,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int		   *sync_priority;
-	int			priority = 0;
-	int			sync_standby = -1;
+	int			sync_standby;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -2774,36 +2773,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	/*
 	 * Get the priorities of sync standbys all in one go, to minimise lock
 	 * acquisitions and to allow us to evaluate who is the current sync
-	 * standby. This code must match the code in SyncRepReleaseWaiters().
+	 * standby.
 	 */
 	sync_priority = palloc(sizeof(int) * max_wal_senders);
 	LWLockAcquire(SyncRepLock, LW_SHARED);
+
+	/* Get first the priorities on each standby as long as we hold a lock */
 	for (i = 0; i < max_wal_senders; i++)
 	{
 		/* use volatile pointer to prevent code rearrangement */
 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 
-		if (walsnd->pid != 0)
-		{
-			/*
-			 * Treat a standby such as a pg_basebackup background process
-			 * which always returns an invalid flush location, as an
-			 * asynchronous standby.
-			 */
-			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-				0 : walsnd->sync_standby_priority;
-
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
-			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
-			}
-		}
+		sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+			0 : walsnd->sync_standby_priority;
 	}
+
+	/* Obtain list of synchronous standbys */
+	sync_standby = SyncRepGetSynchronousNode();
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2873,6 +2859,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
+
+	/* Cleanup */
 	pfree(sync_priority);
 
 	/* clean up and return the tuplestore */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..552392e 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -49,6 +49,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 
 /* called by various procs */
 extern int	SyncRepWakeQueue(bool all, int mode);
+extern int	SyncRepGetSynchronousNode(void);
 
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
-- 
2.1.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to