Thanks for your review! (No worries for creating a new thread, I don't
mind as this is not a huge patch. I think you could have downloaded
the mbox from postgresql.org btw).

On Thu, Oct 30, 2014 at 8:24 AM, Jim Nasby <j...@nasby.net> wrote:
> SyncRepGetSynchronousNode():
> IMHO, the top comment should mention that if there are multiple standbys of
> the same priority it returns the first one.
OK. Corrected.

> This switches from using a single if() with multiple conditions &&'d
> together to a bunch of if() continue's. I don't know if those will perform
> the same, and AFAIK this is pretty performance critical.
Well, we could still use the old notation with a single if(). That's
not much complicated to change.

> <grammarZealot>In the comments, "Process" should be
> "Proceed".</grammarZealot>
Noted. Thanks for correcting my Frenglish.

> pg_stat_get_wal_senders():
> The original version only loops through walsnds[] one time; the new code
> loops twice, both times while holding SyncRepLock(shared). This will block
> transaction commit if syncrep is enabled. That's not great, but presumably
> this function isn't going to be called terribly often, so maybe it's OK. (On
> a side note, perhaps we should add a warning to the documentation for
> pg_stat_replication warning not to hammer it...)
Hm. For code readability I did not really want to do so, but let's do
this: let's add a new argument in SyncRepGetSynchronousNode to
retrieve the priority of each WAL sender and do a single pass through
the array such as there is no performance difference.

Updated patch attached.
Regards,
-- 
Michael
From 1a02c982de0aeb2bd7dcf0bbf34605017619a417 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Thu, 30 Oct 2014 22:01:10 +0900
Subject: [PATCH] Refactor code to detect synchronous node in WAL sender array

This patch is made to remove code duplication in walsender.c and syncrep.c
in order to detect what is the node with the lowest strictly-positive
priority, facilitating maintenance of this code.
---
 src/backend/replication/syncrep.c   | 99 +++++++++++++++++++++++++++----------
 src/backend/replication/walsender.c | 36 +++-----------
 src/include/replication/syncrep.h   |  1 +
 3 files changed, 80 insertions(+), 56 deletions(-)

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..70c799b 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -357,6 +357,70 @@ SyncRepInitConfig(void)
 	}
 }
 
+
+/*
+ * Obtain position of synchronous standby in the array referencing all
+ * the WAL senders, or -1 if no synchronous node can be found. The caller
+ * of this function should take a lock on SyncRepLock. If there are
+ * multiple nodes with the same lowest priority value, the first node
+ * found is selected.
+ * sync_priority is a preallocated array of size max_wal_senders that
+ * can be used to retrieve the priority of each WAL sender. Its
+ * inclusion in this function has the advantage to limit the scan
+ * of the WAL sender array to one pass, limiting the amount of cycles
+ * SyncRepLock is taken.
+ */
+int
+SyncRepGetSynchronousNode(int *node_priority)
+{
+	int		sync_node = -1;
+	int		priority = 0;
+	int		i;
+
+	/* Scan WAL senders and find synchronous node if any */
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		/* Use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/* First get the priority of this WAL sender */
+		if (node_priority)
+			node_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+				0 : walsnd->sync_standby_priority;
+
+		/*
+		 * Proceed immediately to next WAL sender if one of those
+		 * conditions is satisfied:
+		 * - Not active with PID equal to 0.
+		 * - Not streaming.
+		 * - Priority conditions not satisfied. If priority is not 0 it
+		 *   means that one potential synchronous node has already been
+		 *   found. The node selected needs as well to have the lowest
+		 *   priority in the list of WAL senders.
+		 * - Stream flush position is invalid.
+		 */
+		if (walsnd->pid == 0 ||
+			walsnd->state != WALSNDSTATE_STREAMING ||
+			walsnd->sync_standby_priority == 0 ||
+			(priority != 0 &&
+			 priority <= walsnd->sync_standby_priority) ||
+			XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * We have a potential synchronous candidate, choose it as the
+		 * synchronous node for the time being before going through the
+		 * other nodes listed in the WAL sender array.
+		 */
+		sync_node = i;
+
+		/* Update priority to current value of WAL sender */
+		priority = walsnd->sync_standby_priority;
+	}
+
+	return sync_node;
+}
+
 /*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
@@ -369,10 +433,9 @@ SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	volatile WalSnd *syncWalSnd = NULL;
+	int			sync_node;
 	int			numwrite = 0;
 	int			numflush = 0;
-	int			priority = 0;
-	int			i;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,32 +451,14 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
-	 * change pg_stat_get_wal_senders().
+	 * then we use the first mentioned standbys.
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	sync_node = SyncRepGetSynchronousNode(NULL);
 
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
-		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
-		}
-	}
-
-	/*
-	 * We should have found ourselves at least.
-	 */
-	Assert(syncWalSnd);
+	/* We should have found ourselves at least */
+	Assert(sync_node >= 0 && sync_node < max_wal_senders);
+	syncWalSnd = &WalSndCtl->walsnds[sync_node];
 
 	/*
 	 * If we aren't managing the highest priority standby then just leave.
@@ -444,7 +489,7 @@ SyncRepReleaseWaiters(void)
 
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
-	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
 
 	/*
 	 * If we are managing the highest priority standby, though we weren't
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 385d18b..7974ed9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2742,8 +2742,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int		   *sync_priority;
-	int			priority = 0;
-	int			sync_standby = -1;
+	int			sync_standby;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -2774,36 +2773,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	/*
 	 * Get the priorities of sync standbys all in one go, to minimise lock
 	 * acquisitions and to allow us to evaluate who is the current sync
-	 * standby. This code must match the code in SyncRepReleaseWaiters().
+	 * standby.
 	 */
-	sync_priority = palloc(sizeof(int) * max_wal_senders);
+	sync_priority = (int *) palloc(sizeof(int) * max_wal_senders);
 	LWLockAcquire(SyncRepLock, LW_SHARED);
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 
-		if (walsnd->pid != 0)
-		{
-			/*
-			 * Treat a standby such as a pg_basebackup background process
-			 * which always returns an invalid flush location, as an
-			 * asynchronous standby.
-			 */
-			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-				0 : walsnd->sync_standby_priority;
-
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
-			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
-			}
-		}
-	}
+	/* Obtain list of synchronous standbys */
+	sync_standby = SyncRepGetSynchronousNode(sync_priority);
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2873,6 +2849,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
+
+	/* Cleanup */
 	pfree(sync_priority);
 
 	/* clean up and return the tuplestore */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..62e1652 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -49,6 +49,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 
 /* called by various procs */
 extern int	SyncRepWakeQueue(bool all, int mode);
+extern int	SyncRepGetSynchronousNode(int *sync_priority);
 
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
-- 
2.1.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to