On 11/18/2014 11:23 PM, Michael Paquier wrote:
On Tue, Nov 18, 2014 at 6:33 PM, Simon Riggs <si...@2ndquadrant.com> wrote:
Can we just wait on this patch until we have the whole feature?
Well, this may take some time to even define, and even if goals are clearly
defined this may take even more time to have a prototype to discuss about.
We quite often break larger patches down into smaller ones, but if
they arrive in lots of small pieces its more difficult to understand
and correct issues that are happening on the larger scale. Churning
the same piece of code multiple times is rather mind numbing.
Hm. I think that we are losing ourselves in this thread. The primary goal
is to remove a code duplication between syncrep.c and walsender,c that
exists since 9.1. Would it be possible to focus only on that for now?
That's still the topic of this CF.
Some comments on this patch:
* I don't like filling the priorities-array in
SyncRepGetSynchronousStandby(). It might be convenient for the one
caller that needs it, but it seems pretty ad hoc.
In fact, I don't really see the point of having the array at all. You
could just print the priority in the main loop in
pg_stat_get_wal_senders(). Yeah, nominally you need to hold SyncRepLock
while reading the priority, but it seems over-zealous to be so strict
about that in pg_stat_wal_senders(), since it's just an informational
view, and priority changes so rarely that it's highly unlikely to hit a
race condition there. Also note that when you change the list of
synchronous standbys in the config file, and SIGHUP, the walsender
processes will update their priorities in random order. So the idea that
you get a consistent snapshot of the priorities is a bit bogus anyway.
Also note that between filling the priorities array and the main loop in
pg_stat_get_wal_senders(), a new WAL sender might connect and set a
slot's pid. With the current coding, you'll print an uninitialized value
from the array if that happens. So the only thing that holding the
SyncRepLock really protects from is a "torn" read of the value, if
reading an int is not atomic. We could use the spinlock to protect from
that if we care.
* Would be nicer to return a pointer, than index into the wal senders
array. That's what the caller really wants.
I propose the attached (I admit I haven't tested it).
- Heikki
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..da89a3b 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
* Synchronous replication is new as of PostgreSQL 9.1.
*
* If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
@@ -358,6 +358,54 @@ SyncRepInitConfig(void)
}
/*
+ * Find the WAL sender servicing the synchronous standby with the lowest
+ * priority value, or NULL if no synchronous standby is connected. If there
+ * are multiple nodes with the same lowest priority value, the first node
+ * found is selected. The caller must hold SyncRepLock.
+ */
+WalSnd *
+SyncRepGetSynchronousStandby(void)
+{
+ WalSnd *result = NULL;
+ int result_priority = 0;
+ int i;
+
+ /* Scan WAL senders and find synchronous node if any */
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* Use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ int this_priority;
+
+ /* Must be active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Must be streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Must be synchronous */
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == 0)
+ continue;
+
+ /* Must have a lower priority value than any previous ones */
+ if (result != NULL && result_priority <= this_priority)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ result = (WalSnd *) walsnd;
+ result_priority = this_priority;
+ }
+
+ return result;
+}
+
+/*
* Update the LSNs on each queue based upon our latest state. This
* implements a simple policy of first-valid-standby-releases-waiter.
*
@@ -368,11 +416,9 @@ void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
- volatile WalSnd *syncWalSnd = NULL;
+ WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
- int priority = 0;
- int i;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -388,32 +434,13 @@ SyncRepReleaseWaiters(void)
/*
* We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities
- * then we use the first mentioned standby. If you change this, also
- * change pg_stat_get_wal_senders().
+ * then we use the first mentioned standbys.
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ syncWalSnd = SyncRepGetSynchronousStandby();
- for (i = 0; i < max_wal_senders; i++)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
- if (walsnd->pid != 0 &&
- walsnd->state == WALSNDSTATE_STREAMING &&
- walsnd->sync_standby_priority > 0 &&
- (priority == 0 ||
- priority > walsnd->sync_standby_priority) &&
- !XLogRecPtrIsInvalid(walsnd->flush))
- {
- priority = walsnd->sync_standby_priority;
- syncWalSnd = walsnd;
- }
- }
-
- /*
- * We should have found ourselves at least.
- */
- Assert(syncWalSnd);
+ /* We should have found ourselves at least */
+ Assert(syncWalSnd != NULL);
/*
* If we aren't managing the highest priority standby then just leave.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index addae8f..742c455 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
- int *sync_priority;
- int priority = 0;
- int sync_standby = -1;
+ WalSnd *sync_standby;
int i;
/* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
/*
- * Get the priorities of sync standbys all in one go, to minimise lock
- * acquisitions and to allow us to evaluate who is the current sync
- * standby. This code must match the code in SyncRepReleaseWaiters().
+ * Get the currently active synchronous standby.
*/
- sync_priority = palloc(sizeof(int) * max_wal_senders);
LWLockAcquire(SyncRepLock, LW_SHARED);
- for (i = 0; i < max_wal_senders; i++)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
- if (walsnd->pid != 0)
- {
- /*
- * Treat a standby such as a pg_basebackup background process
- * which always returns an invalid flush location, as an
- * asynchronous standby.
- */
- sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
- 0 : walsnd->sync_standby_priority;
-
- if (walsnd->state == WALSNDSTATE_STREAMING &&
- walsnd->sync_standby_priority > 0 &&
- (priority == 0 ||
- priority > walsnd->sync_standby_priority) &&
- !XLogRecPtrIsInvalid(walsnd->flush))
- {
- priority = walsnd->sync_standby_priority;
- sync_standby = i;
- }
- }
- }
+ sync_standby = SyncRepGetSynchronousStandby();
LWLockRelease(SyncRepLock);
for (i = 0; i < max_wal_senders; i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
+ priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -2857,15 +2829,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[5] = true;
values[5] = LSNGetDatum(apply);
- values[6] = Int32GetDatum(sync_priority[i]);
+ values[6] = Int32GetDatum(priority);
/*
* More easily understood version of standby state. This is purely
* informational, not different from priority.
*/
- if (sync_priority[i] == 0)
+ if (priority == 0)
values[7] = CStringGetTextDatum("async");
- else if (i == sync_standby)
+ else if (walsnd == sync_standby)
values[7] = CStringGetTextDatum("sync");
else
values[7] = CStringGetTextDatum("potential");
@@ -2873,7 +2845,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
- pfree(sync_priority);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..6f78fee 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
/* called by various procs */
extern int SyncRepWakeQueue(bool all, int mode);
+/* forward declaration to avoid pulling in walsender_private.h */
+struct WalSnd;
+extern struct WalSnd *SyncRepGetSynchronousStandby(void);
+
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
extern void assign_synchronous_commit(int newval, void *extra);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers