On Mon, Aug 11, 2014 at 1:31 AM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Sat, Aug 9, 2014 at 3:03 PM, Michael Paquier > <michael.paqu...@gmail.com> wrote: > Great! This is really the feature which I really want. > Though I forgot why we missed this feature when > we had added the synchronous replication feature, > maybe it's worth reading the old discussion which > may suggest the potential problem of N sync standbys. Sure, I'll double check. Thanks for your comments.
> I just tested this feature with synchronous_standby_num = 2. > I started up only one synchronous standby and ran > the write transaction. Then the transaction was successfully > completed, i.e., it didn't wait for two standbys. Probably > this is a bug of the patch. Oh OK, yes this is a bug of what I did. The number of standbys to wait for takes precedence on the number of standbys found in the list of active WAL senders. I changed the patch to take into account that behavior. So for example if you have only one sync standby connected, and synchronous_standby_num = 2, client waits indefinitely. > And, you forgot to add the line of synchronous_standby_num > to postgresql.conf.sample. Yep, right. On top of that, I refactored the code in such a way that pg_stat_get_wal_senders and SyncRepReleaseWaiters rely on a single API to get the list of synchronous standbys found. This reduces code duplication, duplication that already exists in HEAD... Regards, -- Michael
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2586,2597 **** include_dir 'conf.d' Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. ! At any one time there will be at most one active synchronous standby; ! transactions waiting for commit will be allowed to proceed after ! this standby server confirms receipt of their data. ! The synchronous standby will be the first standby named in this list ! that is both currently connected and streaming data in real-time ! (as shown by a state of <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential --- 2586,2598 ---- Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. ! At any one time there will be at a number of active synchronous standbys ! defined by <varname>synchronous_standby_num</>; transactions waiting ! for commit will be allowed to proceed after those standby servers ! confirms receipt of their data. The synchronous standbys will be ! the first entries named in this list that are both currently connected ! and streaming data in real-time (as shown by a state of ! <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential *************** *** 2627,2632 **** include_dir 'conf.d' --- 2628,2652 ---- </listitem> </varlistentry> + <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num"> + <term><varname>synchronous_standby_num</varname> (<type>integer</type>) + <indexterm> + <primary><varname>synchronous_standby_num</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the number of standbys that support + <firstterm>synchronous replication</>, as described in + <xref linkend="synchronous-replication">, and listed as the first + elements of <xref linkend="guc-synchronous-standby-names">. + </para> + <para> + Default value is 1. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age"> <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>) <indexterm> *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 1081,1092 **** primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. ! If the standby is the first matching standby, as specified in ! <varname>synchronous_standby_names</> on the primary, the reply ! messages from that standby will be used to wake users waiting for ! confirmation that the commit record has been received. These parameters ! allow the administrator to specify which standby servers should be ! synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. --- 1081,1092 ---- WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. ! If the standby is the first <varname>synchronous_standby_num</> matching ! standbys, as specified in <varname>synchronous_standby_names</> on the ! primary, the reply messages from that standby will be used to wake users ! waiting for confirmation that the commit record has been received. These ! parameters allow the administrator to specify which standby servers should ! be synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. *************** *** 1169,1177 **** primary_slot_name = 'node_a_slot' The best solution for avoiding data loss is to ensure you don't lose your last remaining synchronous standby. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. ! The first named standby will be used as the synchronous standby. Standbys ! listed after this will take over the role of synchronous standby if the ! first one should fail. </para> <para> --- 1169,1177 ---- The best solution for avoiding data loss is to ensure you don't lose your last remaining synchronous standby. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. ! The first <varname>synchronous_standby_num</> named standbys will be used as ! the synchronous standbys. Standbys listed after this will take over the role ! of synchronous standby if the first one should fail. </para> <para> *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 5,11 **** * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the sync standby. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming --- 5,11 ---- * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming *************** *** 59,64 **** --- 59,65 ---- /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; + int synchronous_standby_num = 1; #define SyncStandbysDefined() \ (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') *************** *** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; --- 207,213 ---- ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; *************** *** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); SyncRepCancelWait(); break; } --- 224,230 ---- QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); SyncRepCancelWait(); break; } *************** *** 357,365 **** SyncRepInitConfig(void) } } /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standby-releases-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. --- 358,442 ---- } } + + /* + * Obtain a palloc'd array containing positions of stanbys currently + * considered as synchronous. Caller is responsible for freeing the + * data obtained. + * Callers of this function should as well take a necessary lock on + * SyncRepLock. + */ + int * + SyncRepGetSynchronousNodes(int *num_sync) + { + int *sync_standbys; + int priority = 0; + int i; + + /* Make enough room */ + sync_standbys = (int *) palloc(synchronous_standby_num * sizeof(int)); + + for (i = 0; i < max_wal_senders; i++) + { + /* Use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* Process to next if not active */ + if (walsnd->pid == 0) + continue; + + /* Process to next if not streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Process to next one if asynchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Process to next one if priority conditions not satisfied */ + if (priority != 0 && + priority <= walsnd->sync_standby_priority && + *num_sync == synchronous_standby_num) + continue; + + /* Process to next one if flush position is invalid */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * We have a potential synchronous candidate, add it to the + * list of nodes already present or evict the node with highest + * priority found until now. + */ + if (*num_sync == synchronous_standby_num) + { + int j; + for (j = 0; j < *num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]]; + if (walsndloc->sync_standby_priority == priority) + { + sync_standbys[j] = i; + break; + } + } + } + else + { + sync_standbys[*num_sync] = i; + (*num_sync)++; + } + + /* Update priority for next tracking */ + priority = walsnd->sync_standby_priority; + } + + return sync_standbys; + } + /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standbys-release-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. *************** *** 368,378 **** void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! volatile WalSnd *syncWalSnd = NULL; int numwrite = 0; int numflush = 0; ! int priority = 0; int i; /* * If this WALSender is serving a standby that is not on the list of --- 445,456 ---- SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! int *sync_standbys; int numwrite = 0; int numflush = 0; ! int num_sync = 0; int i; + bool found = false; /* * If this WALSender is serving a standby that is not on the list of *************** *** 388,427 **** SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standby. If you change this, also ! * change pg_stat_get_wal_senders(). */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); ! for (i = 0; i < max_wal_senders; i++) { ! /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = &walsndctl->walsnds[i]; ! ! if (walsnd->pid != 0 && ! walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) { ! priority = walsnd->sync_standby_priority; ! syncWalSnd = walsnd; } } /* ! * We should have found ourselves at least. */ ! Assert(syncWalSnd); /* ! * If we aren't managing the highest priority standby then just leave. */ ! if (syncWalSnd != MyWalSnd) { LWLockRelease(SyncRepLock); ! announce_next_takeover = true; return; } --- 466,516 ---- /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); ! /* ! * We should have found ourselves at least. ! */ ! Assert(num_sync > 0); ! ! /* ! * If we aren't managing one of the standbys with highest priority ! * then just leave. ! */ ! for (i = 0; i < num_sync; i++) { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! if (walsndloc == MyWalSnd) { ! found = true; ! break; } } /* ! * We are definitely not one of the chosen... But we could at the by ! * taking the next takeover. */ ! if (!found) ! { ! LWLockRelease(SyncRepLock); ! pfree(sync_standbys); ! announce_next_takeover = true; ! return; ! } /* ! * Even if we are one of the chosen standbys, leave if there ! * are less synchronous standbys in waiting state than what is ! * expected by the user. */ ! if (num_sync < synchronous_standby_num) { LWLockRelease(SyncRepLock); ! pfree(sync_standbys); return; } *************** *** 448,454 **** SyncRepReleaseWaiters(void) /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now the sync standby. */ if (announce_next_takeover) { --- 537,543 ---- /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now a sync standby. */ if (announce_next_takeover) { *************** *** 457,462 **** SyncRepReleaseWaiters(void) --- 546,554 ---- (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + pfree(sync_standbys); } /* *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 2733,2740 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int priority = 0; ! int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ --- 2733,2740 ---- MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int *sync_standbys; ! int num_sync = 0; int i; /* check to see if caller supports us returning a tuplestore */ *************** *** 2765,2800 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. This code must match the code in SyncRepReleaseWaiters(). */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! if (walsnd->pid != 0) ! { ! /* ! * Treat a standby such as a pg_basebackup background process ! * which always returns an invalid flush location, as an ! * asynchronous standby. ! */ ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; ! ! if (walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) ! { ! priority = walsnd->sync_standby_priority; ! sync_standby = i; ! } ! } } LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) --- 2765,2787 ---- /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); + + /* Get first the priorities on each standby as long as we hold a lock */ for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; } + + /* Obtain list of synchronous standbys */ + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) *************** *** 2856,2870 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); - else if (i == sync_standby) - values[7] = CStringGetTextDatum("sync"); else ! values[7] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); --- 2843,2872 ---- */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); else ! { ! int j; ! bool found = false; ! ! for (j = 0; j < num_sync; j++) ! { ! /* Found that this node is one in sync */ ! if (i == sync_standbys[j]) ! { ! values[7] = CStringGetTextDatum("sync"); ! found = true; ! break; ! } ! } ! if (!found) ! values[7] = CStringGetTextDatum("potential"); ! } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); + pfree(sync_standbys); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 2551,2556 **** static struct config_int ConfigureNamesInt[] = --- 2551,2566 ---- NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number of potential synchronous standbys."), + NULL + }, + &synchronous_standby_num, + 1, 1, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 235,240 **** --- 235,241 ---- #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all + #synchronous_standby_num = 1 # number of standbys servers using sync rep #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 33,38 **** --- 33,39 ---- /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; + extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); *************** *** 49,54 **** extern void SyncRepUpdateSyncStandbysDefined(void); --- 50,56 ---- /* called by various procs */ extern int SyncRepWakeQueue(bool all, int mode); + extern int *SyncRepGetSynchronousNodes(int *num_sync); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers