On Thu, Dec 11, 2014 at 10:07 PM, Heikki Linnakangas <
[email protected]> wrote:
> * I don't like filling the priorities-array in
> SyncRepGetSynchronousStandby(). It might be convenient for the one caller
> that needs it, but it seems pretty ad hoc.
>
> In fact, I don't really see the point of having the array at all. You
> could just print the priority in the main loop in
> pg_stat_get_wal_senders(). Yeah, nominally you need to hold SyncRepLock
> while reading the priority, but it seems over-zealous to be so strict about
> that in pg_stat_wal_senders(), since it's just an informational view, and
> priority changes so rarely that it's highly unlikely to hit a race
> condition there. Also note that when you change the list of synchronous
> standbys in the config file, and SIGHUP, the walsender processes will
> update their priorities in random order. So the idea that you get a
> consistent snapshot of the priorities is a bit bogus anyway. Also note that
> between filling the priorities array and the main loop in
> pg_stat_get_wal_senders(), a new WAL sender might connect and set a slot's
> pid. With the current coding, you'll print an uninitialized value from the
> array if that happens. So the only thing that holding the SyncRepLock
> really protects from is a "torn" read of the value, if reading an int is
> not atomic. We could use the spinlock to protect from that if we care.
>
That's fair, and it simplifies the whole process as well as the API able to
fetch the synchronous WAL sender.
> * Would be nicer to return a pointer, than index into the wal senders
> array. That's what the caller really wants.
>
> I propose the attached (I admit I haven't tested it).
>
Actually if you do it this way I think that it would be worth adding the
small optimization Fujii-san mentioned upthread: if priority is equal to 1,
we leave the loop earlier and return immediately the pointer. All those
things gathered give the patch attached, that I actually tested FWIW with
multiple standbys and multiple entries in s_s_names.
Regards,
--
Michael
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..34530a0 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -5,7 +5,7 @@
* Synchronous replication is new as of PostgreSQL 9.1.
*
* If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
*
* This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming
@@ -358,6 +358,62 @@ SyncRepInitConfig(void)
}
/*
+ * Find the WAL sender servicing the synchronous standby with the lowest
+ * priority value, or NULL if no synchronous standby is connected. If there
+ * are multiple nodes with the same lowest priority value, the first node
+ * found is selected. The caller must hold SyncRepLock.
+ */
+WalSnd *
+SyncRepGetSynchronousStandby(void)
+{
+ WalSnd *result = NULL;
+ int result_priority = 0;
+ int i;
+
+ /* Scan WAL senders and find synchronous node if any */
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* Use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ int this_priority;
+
+ /* Must be active */
+ if (walsnd->pid == 0)
+ continue;
+
+ /* Must be streaming */
+ if (walsnd->state != WALSNDSTATE_STREAMING)
+ continue;
+
+ /* Must be synchronous */
+ this_priority = walsnd->sync_standby_priority;
+ if (this_priority == 0)
+ continue;
+
+ /* Must have a lower priority value than any previous ones */
+ if (result != NULL && result_priority <= this_priority)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(walsnd->flush))
+ continue;
+
+ result = (WalSnd *) walsnd;
+ result_priority = this_priority;
+
+ /*
+ * If priority is equal to 1, we are sure that there are no other
+ * WAL senders that could be synchronous with a lower prioroty,
+ * hence leave immediately with this one.
+ */
+ if (this_priority == 1)
+ return result;
+ }
+
+ return result;
+}
+
+/*
* Update the LSNs on each queue based upon our latest state. This
* implements a simple policy of first-valid-standby-releases-waiter.
*
@@ -368,11 +424,9 @@ void
SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
- volatile WalSnd *syncWalSnd = NULL;
+ WalSnd *syncWalSnd;
int numwrite = 0;
int numflush = 0;
- int priority = 0;
- int i;
/*
* If this WALSender is serving a standby that is not on the list of
@@ -388,32 +442,13 @@ SyncRepReleaseWaiters(void)
/*
* We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities
- * then we use the first mentioned standby. If you change this, also
- * change pg_stat_get_wal_senders().
+ * then we use the first mentioned standbys.
*/
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ syncWalSnd = SyncRepGetSynchronousStandby();
- for (i = 0; i < max_wal_senders; i++)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
- if (walsnd->pid != 0 &&
- walsnd->state == WALSNDSTATE_STREAMING &&
- walsnd->sync_standby_priority > 0 &&
- (priority == 0 ||
- priority > walsnd->sync_standby_priority) &&
- !XLogRecPtrIsInvalid(walsnd->flush))
- {
- priority = walsnd->sync_standby_priority;
- syncWalSnd = walsnd;
- }
- }
-
- /*
- * We should have found ourselves at least.
- */
- Assert(syncWalSnd);
+ /* We should have found ourselves at least */
+ Assert(syncWalSnd != NULL);
/*
* If we aren't managing the highest priority standby then just leave.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index addae8f..742c455 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
- int *sync_priority;
- int priority = 0;
- int sync_standby = -1;
+ WalSnd *sync_standby;
int i;
/* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
/*
- * Get the priorities of sync standbys all in one go, to minimise lock
- * acquisitions and to allow us to evaluate who is the current sync
- * standby. This code must match the code in SyncRepReleaseWaiters().
+ * Get the currently active synchronous standby.
*/
- sync_priority = palloc(sizeof(int) * max_wal_senders);
LWLockAcquire(SyncRepLock, LW_SHARED);
- for (i = 0; i < max_wal_senders; i++)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
- if (walsnd->pid != 0)
- {
- /*
- * Treat a standby such as a pg_basebackup background process
- * which always returns an invalid flush location, as an
- * asynchronous standby.
- */
- sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
- 0 : walsnd->sync_standby_priority;
-
- if (walsnd->state == WALSNDSTATE_STREAMING &&
- walsnd->sync_standby_priority > 0 &&
- (priority == 0 ||
- priority > walsnd->sync_standby_priority) &&
- !XLogRecPtrIsInvalid(walsnd->flush))
- {
- priority = walsnd->sync_standby_priority;
- sync_standby = i;
- }
- }
- }
+ sync_standby = SyncRepGetSynchronousStandby();
LWLockRelease(SyncRepLock);
for (i = 0; i < max_wal_senders; i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
+ priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -2857,15 +2829,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[5] = true;
values[5] = LSNGetDatum(apply);
- values[6] = Int32GetDatum(sync_priority[i]);
+ values[6] = Int32GetDatum(priority);
/*
* More easily understood version of standby state. This is purely
* informational, not different from priority.
*/
- if (sync_priority[i] == 0)
+ if (priority == 0)
values[7] = CStringGetTextDatum("async");
- else if (i == sync_standby)
+ else if (walsnd == sync_standby)
values[7] = CStringGetTextDatum("sync");
else
values[7] = CStringGetTextDatum("potential");
@@ -2873,7 +2845,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
- pfree(sync_priority);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..6f78fee 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
/* called by various procs */
extern int SyncRepWakeQueue(bool all, int mode);
+/* forward declaration to avoid pulling in walsender_private.h */
+struct WalSnd;
+extern struct WalSnd *SyncRepGetSynchronousStandby(void);
+
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
extern void assign_synchronous_commit(int newval, void *extra);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers