On Fri, Oct 31, 2014 at 6:59 AM, Jim Nasby <jim.na...@bluetreble.com> wrote:

> If we stick with this version I'd argue it makes more sense to just stick
> the sync_node = and priority = statements into the if block and ditch the
> continue. </nitpick>
>
Let's go with the cleaner version then, I'd prefer code that can be read
easily for this code path. Switching back is not much complicated either.
-- 
Michael
From 00d85f046d187056a85c6d6dc82e9a79674b132d Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Thu, 30 Oct 2014 22:01:10 +0900
Subject: [PATCH] Refactor code to detect synchronous node in WAL sender array

This patch is made to remove code duplication in walsender.c and syncrep.c
in order to detect what is the node with the lowest strictly-positive
priority, facilitating maintenance of this code.
---
 src/backend/replication/syncrep.c   | 101 ++++++++++++++++++++++++++----------
 src/backend/replication/walsender.c |  36 +++----------
 src/include/replication/syncrep.h   |   1 +
 3 files changed, 82 insertions(+), 56 deletions(-)

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..848c783 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -357,6 +357,72 @@ SyncRepInitConfig(void)
 	}
 }
 
+
+/*
+ * Obtain position of synchronous standby in the array referencing all
+ * the WAL senders, or -1 if no synchronous node can be found. The caller
+ * of this function should take a lock on SyncRepLock. If there are
+ * multiple nodes with the same lowest priority value, the first node
+ * found is selected.
+ * sync_priority is a preallocated array of size max_wal_senders that
+ * can be used to retrieve the priority of each WAL sender. Its
+ * inclusion in this function has the advantage to limit the scan
+ * of the WAL sender array to one pass, limiting the amount of cycles
+ * SyncRepLock is taken.
+ */
+int
+SyncRepGetSynchronousNode(int *node_priority)
+{
+	int		sync_node = -1;
+	int		priority = 0;
+	int		i;
+
+	/* Scan WAL senders and find synchronous node if any */
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		/* Use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/* First get the priority of this WAL sender */
+		if (node_priority)
+			node_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+				0 : walsnd->sync_standby_priority;
+
+		/* Proceed to next if not active */
+		if (walsnd->pid == 0)
+			continue;
+
+		/* Proceed to next if not streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Proceed to next one if asynchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Proceed to next one if priority conditions not satisfied */
+		if (priority != 0 &&
+			priority <= walsnd->sync_standby_priority)
+			continue;
+
+		/* Proceed to next one if flush position is invalid */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * We have a potential synchronous candidate, choose it as the
+		 * synchronous node for the time being before going through the
+		 * other nodes listed in the WAL sender array.
+		 */
+		sync_node = i;
+
+		/* Update priority to current value of WAL sender */
+		priority = walsnd->sync_standby_priority;
+	}
+
+	return sync_node;
+}
+
 /*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
@@ -369,10 +435,9 @@ SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	volatile WalSnd *syncWalSnd = NULL;
+	int			sync_node;
 	int			numwrite = 0;
 	int			numflush = 0;
-	int			priority = 0;
-	int			i;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,32 +453,14 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
-	 * change pg_stat_get_wal_senders().
+	 * then we use the first mentioned standbys.
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	sync_node = SyncRepGetSynchronousNode(NULL);
 
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
-		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
-		}
-	}
-
-	/*
-	 * We should have found ourselves at least.
-	 */
-	Assert(syncWalSnd);
+	/* We should have found ourselves at least */
+	Assert(sync_node >= 0 && sync_node < max_wal_senders);
+	syncWalSnd = &WalSndCtl->walsnds[sync_node];
 
 	/*
 	 * If we aren't managing the highest priority standby then just leave.
@@ -444,7 +491,7 @@ SyncRepReleaseWaiters(void)
 
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
-	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
 
 	/*
 	 * If we are managing the highest priority standby, though we weren't
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 385d18b..7974ed9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2742,8 +2742,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int		   *sync_priority;
-	int			priority = 0;
-	int			sync_standby = -1;
+	int			sync_standby;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -2774,36 +2773,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	/*
 	 * Get the priorities of sync standbys all in one go, to minimise lock
 	 * acquisitions and to allow us to evaluate who is the current sync
-	 * standby. This code must match the code in SyncRepReleaseWaiters().
+	 * standby.
 	 */
-	sync_priority = palloc(sizeof(int) * max_wal_senders);
+	sync_priority = (int *) palloc(sizeof(int) * max_wal_senders);
 	LWLockAcquire(SyncRepLock, LW_SHARED);
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 
-		if (walsnd->pid != 0)
-		{
-			/*
-			 * Treat a standby such as a pg_basebackup background process
-			 * which always returns an invalid flush location, as an
-			 * asynchronous standby.
-			 */
-			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-				0 : walsnd->sync_standby_priority;
-
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
-			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
-			}
-		}
-	}
+	/* Obtain list of synchronous standbys */
+	sync_standby = SyncRepGetSynchronousNode(sync_priority);
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2873,6 +2849,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
+
+	/* Cleanup */
 	pfree(sync_priority);
 
 	/* clean up and return the tuplestore */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..62e1652 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -49,6 +49,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 
 /* called by various procs */
 extern int	SyncRepWakeQueue(bool all, int mode);
+extern int	SyncRepGetSynchronousNode(int *sync_priority);
 
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
-- 
2.1.3

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to