On 11/18/2014 11:23 PM, Michael Paquier wrote:
On Tue, Nov 18, 2014 at 6:33 PM, Simon Riggs <si...@2ndquadrant.com> wrote:

Can we just wait on this patch until we have the whole feature?

Well, this may take some time to even define, and even if goals are clearly
defined this may take even more time to have a prototype to discuss about.


We quite often break larger patches down into smaller ones, but if
they arrive in lots of small pieces its more difficult to understand
and correct issues that are happening on the larger scale. Churning
the same piece of code multiple times is rather mind numbing.

Hm. I think that we are losing ourselves in this thread. The primary goal
is to remove a code duplication between syncrep.c and walsender,c that
exists since 9.1. Would it be possible to focus only on that for now?
That's still the topic of this CF.

Some comments on this patch:

* I don't like filling the priorities-array in SyncRepGetSynchronousStandby(). It might be convenient for the one caller that needs it, but it seems pretty ad hoc.

In fact, I don't really see the point of having the array at all. You could just print the priority in the main loop in pg_stat_get_wal_senders(). Yeah, nominally you need to hold SyncRepLock while reading the priority, but it seems over-zealous to be so strict about that in pg_stat_wal_senders(), since it's just an informational view, and priority changes so rarely that it's highly unlikely to hit a race condition there. Also note that when you change the list of synchronous standbys in the config file, and SIGHUP, the walsender processes will update their priorities in random order. So the idea that you get a consistent snapshot of the priorities is a bit bogus anyway. Also note that between filling the priorities array and the main loop in pg_stat_get_wal_senders(), a new WAL sender might connect and set a slot's pid. With the current coding, you'll print an uninitialized value from the array if that happens. So the only thing that holding the SyncRepLock really protects from is a "torn" read of the value, if reading an int is not atomic. We could use the spinlock to protect from that if we care.

* Would be nicer to return a pointer, than index into the wal senders array. That's what the caller really wants.

I propose the attached (I admit I haven't tested it).

- Heikki

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..da89a3b 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -358,6 +358,54 @@ SyncRepInitConfig(void)
 }
 
 /*
+ * Find the WAL sender servicing the synchronous standby with the lowest
+ * priority value, or NULL if no synchronous standby is connected. If there
+ * are multiple nodes with the same lowest priority value, the first node
+ * found is selected. The caller must hold SyncRepLock.
+ */
+WalSnd *
+SyncRepGetSynchronousStandby(void)
+{
+	WalSnd	   *result = NULL;
+	int			result_priority = 0;
+	int			i;
+
+	/* Scan WAL senders and find synchronous node if any */
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		/* Use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+		int			this_priority;
+
+		/* Must be active */
+		if (walsnd->pid == 0)
+			continue;
+
+		/* Must be streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Must be synchronous */
+		this_priority = walsnd->sync_standby_priority;
+		if (this_priority == 0)
+			continue;
+
+		/* Must have a lower priority value than any previous ones */
+		if (result != NULL && result_priority <= this_priority)
+			continue;
+
+		/* Must have a valid flush position */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		result = (WalSnd *) walsnd;
+		result_priority = this_priority;
+	}
+
+	return result;
+}
+
+/*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
  *
@@ -368,11 +416,9 @@ void
 SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	volatile WalSnd *syncWalSnd = NULL;
+	WalSnd	   *syncWalSnd;
 	int			numwrite = 0;
 	int			numflush = 0;
-	int			priority = 0;
-	int			i;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,32 +434,13 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
-	 * change pg_stat_get_wal_senders().
+	 * then we use the first mentioned standbys.
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	syncWalSnd = SyncRepGetSynchronousStandby();
 
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
-		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
-		}
-	}
-
-	/*
-	 * We should have found ourselves at least.
-	 */
-	Assert(syncWalSnd);
+	/* We should have found ourselves at least */
+	Assert(syncWalSnd != NULL);
 
 	/*
 	 * If we aren't managing the highest priority standby then just leave.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index addae8f..742c455 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	Tuplestorestate *tupstore;
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
-	int		   *sync_priority;
-	int			priority = 0;
-	int			sync_standby = -1;
+	WalSnd	   *sync_standby;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
-	 * Get the priorities of sync standbys all in one go, to minimise lock
-	 * acquisitions and to allow us to evaluate who is the current sync
-	 * standby. This code must match the code in SyncRepReleaseWaiters().
+	 * Get the currently active synchronous standby.
 	 */
-	sync_priority = palloc(sizeof(int) * max_wal_senders);
 	LWLockAcquire(SyncRepLock, LW_SHARED);
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
-		if (walsnd->pid != 0)
-		{
-			/*
-			 * Treat a standby such as a pg_basebackup background process
-			 * which always returns an invalid flush location, as an
-			 * asynchronous standby.
-			 */
-			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-				0 : walsnd->sync_standby_priority;
-
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
-			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
-			}
-		}
-	}
+	sync_standby = SyncRepGetSynchronousStandby();
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		XLogRecPtr	write;
 		XLogRecPtr	flush;
 		XLogRecPtr	apply;
+		int			priority;
 		WalSndState state;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		write = walsnd->write;
 		flush = walsnd->flush;
 		apply = walsnd->apply;
+		priority = walsnd->sync_standby_priority;
 		SpinLockRelease(&walsnd->mutex);
 
 		memset(nulls, 0, sizeof(nulls));
@@ -2857,15 +2829,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[5] = true;
 			values[5] = LSNGetDatum(apply);
 
-			values[6] = Int32GetDatum(sync_priority[i]);
+			values[6] = Int32GetDatum(priority);
 
 			/*
 			 * More easily understood version of standby state. This is purely
 			 * informational, not different from priority.
 			 */
-			if (sync_priority[i] == 0)
+			if (priority == 0)
 				values[7] = CStringGetTextDatum("async");
-			else if (i == sync_standby)
+			else if (walsnd == sync_standby)
 				values[7] = CStringGetTextDatum("sync");
 			else
 				values[7] = CStringGetTextDatum("potential");
@@ -2873,7 +2845,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
-	pfree(sync_priority);
 
 	/* clean up and return the tuplestore */
 	tuplestore_donestoring(tupstore);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..6f78fee 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 /* called by various procs */
 extern int	SyncRepWakeQueue(bool all, int mode);
 
+/* forward declaration to avoid pulling in walsender_private.h */
+struct WalSnd;
+extern struct WalSnd *SyncRepGetSynchronousStandby(void);
+
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to