On Thu, Aug 14, 2014 at 8:34 PM, Fujii Masao <masao.fu...@gmail.com> wrote: > + At any one time there will be at a number of active > synchronous standbys > + defined by <varname>synchronous_standby_num</>; transactions waiting > > It's better to use <xref linkend="guc-synchronous-standby-num">, instead. Fixed.
> + for commit will be allowed to proceed after those standby servers > + confirms receipt of their data. The synchronous standbys will be > > Typo: confirms -> confirm Fixed. > + <para> > + Specifies the number of standbys that support > + <firstterm>synchronous replication</>, as described in > + <xref linkend="synchronous-replication">, and listed as the first > + elements of <xref linkend="guc-synchronous-standby-names">. > + </para> > + <para> > + Default value is 1. > + </para> > > synchronous_standby_num is defined with PGC_SIGHUP. So the following > should be added into the document. > > This parameter can only be set in the postgresql.conf file or on > the server command line. Fixed. > The name of the parameter "synchronous_standby_num" sounds to me that > the transaction must wait for its WAL to be replicated to s_s_num standbys. > But that's not true in your patch. If s_s_names is empty, replication works > asynchronously whether the value of s_s_num is. I'm afraid that it's > confusing. > The description of s_s_num is not sufficient. I'm afraid that users can easily > misunderstand that they can use quorum commit feature by using s_s_names > and s_s_num. That is, the transaction waits for its WAL to be replicated to > any s_s_num standbys listed in s_s_names. I reworked the docs to mention all that. Yes things are a bit different than any quorum commit facility (how to parametrize that simply without a parameter mapping one to one the items of s_s_names?), as this facility relies on the order of the items of s_s_names and the fact that stansbys are connected at a given time. > When s_s_num is set to larger value than max_wal_senders, we should warn that? Actually I have done a bit more than that by forbidding setting s_s_num to a value higher than max_wal_senders. Thoughts? Now that we discuss the interactions with other parameters. Another thing that I am wondering about now is: what should we do if we specify s_s_num to a number higher than the elements in s_s_names? Currently, the patch gives the priority to s_s_num, in short if we set s_s_num to 100, server will wait for 100 servers to confirm commit even if there are less than 100 elements in s_s_names. I chose this way because it looks saner particularly if s_s_names = '*'. Thoughts once again? > + for (i = 0; i < num_sync; i++) > + { > + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; > + > + if (min_write_pos > walsndloc->write) > + min_write_pos = walsndloc->write; > + if (min_flush_pos > walsndloc->flush) > + min_flush_pos = walsndloc->flush; > + } > > I don't think that it's safe to see those shared values without spinlock. Looking at walsender.c you are right. I have updated the code to use the mutex lock of the walsender whose values are being read from. Regards, -- Michael On Thu, Aug 14, 2014 at 4:34 AM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Wed, Aug 13, 2014 at 4:10 PM, Michael Paquier > <michael.paqu...@gmail.com> wrote: >> On Wed, Aug 13, 2014 at 2:10 PM, Fujii Masao <masao.fu...@gmail.com> wrote: >>> I sent the SIGSTOP signal to the walreceiver process in one of sync >>> standbys, >>> and then ran write transactions again. In this case, they must not be >>> completed >>> because their WAL cannot be replicated to the standby that its walreceiver >>> was stopped. But they were successfully completed. >> >> At the end of SyncRepReleaseWaiters, SYNC_REP_WAIT_WRITE and >> SYNC_REP_WAIT_FLUSH in walsndctl were able to update with only one wal >> sender in sync, making backends wake up even if other standbys did not >> catch up. But we need to scan all the synchronous wal senders and find >> the minimum write and flush positions and update walsndctl with those >> values. Well that's a code path I forgot to cover. >> >> Attached is an updated patch fixing the problem you reported. > > + At any one time there will be at a number of active > synchronous standbys > + defined by <varname>synchronous_standby_num</>; transactions waiting > > It's better to use <xref linkend="guc-synchronous-standby-num">, instead. > > + for commit will be allowed to proceed after those standby servers > + confirms receipt of their data. The synchronous standbys will be > > Typo: confirms -> confirm > > + <para> > + Specifies the number of standbys that support > + <firstterm>synchronous replication</>, as described in > + <xref linkend="synchronous-replication">, and listed as the first > + elements of <xref linkend="guc-synchronous-standby-names">. > + </para> > + <para> > + Default value is 1. > + </para> > > synchronous_standby_num is defined with PGC_SIGHUP. So the following > should be added into the document. > > This parameter can only be set in the postgresql.conf file or on > the server command line. > > The name of the parameter "synchronous_standby_num" sounds to me that > the transaction must wait for its WAL to be replicated to s_s_num standbys. > But that's not true in your patch. If s_s_names is empty, replication works > asynchronously whether the value of s_s_num is. I'm afraid that it's > confusing. > > The description of s_s_num is not sufficient. I'm afraid that users can easily > misunderstand that they can use quorum commit feature by using s_s_names > and s_s_num. That is, the transaction waits for its WAL to be replicated to > any s_s_num standbys listed in s_s_names. > > When s_s_num is set to larger value than max_wal_senders, we should warn that? > > + for (i = 0; i < num_sync; i++) > + { > + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; > + > + if (min_write_pos > walsndloc->write) > + min_write_pos = walsndloc->write; > + if (min_flush_pos > walsndloc->flush) > + min_flush_pos = walsndloc->flush; > + } > > I don't think that it's safe to see those shared values without spinlock. > > Regards, > > -- > Fujii Masao -- Michael
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2586,2597 **** include_dir 'conf.d' Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. ! At any one time there will be at most one active synchronous standby; ! transactions waiting for commit will be allowed to proceed after ! this standby server confirms receipt of their data. ! The synchronous standby will be the first standby named in this list ! that is both currently connected and streaming data in real-time ! (as shown by a state of <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential --- 2586,2598 ---- Specifies a comma-separated list of standby names that can support <firstterm>synchronous replication</>, as described in <xref linkend="synchronous-replication">. ! At any one time there will be at a number of active synchronous standbys ! defined by <xref linkend="guc-synchronous-standby-num">, transactions ! waiting for commit will be allowed to proceed after those standby ! servers confirm receipt of their data. The synchronous standbys will be ! the first entries named in this list that are both currently connected ! and streaming data in real-time (as shown by a state of ! <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). Other standby servers appearing later in this list represent potential *************** *** 2627,2632 **** include_dir 'conf.d' --- 2628,2674 ---- </listitem> </varlistentry> + <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num"> + <term><varname>synchronous_standby_num</varname> (<type>integer</type>) + <indexterm> + <primary><varname>synchronous_standby_num</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the number of standbys that support + <firstterm>synchronous replication</>. + </para> + <para> + Default value is 1. This parameter value cannot be higher than + <xref linkend="guc-max-wal-senders">. + </para> + <para> + Are considered as synchronous the first elements of + <xref linkend="guc-synchronous-standby-names"> in number of + <xref linkend="guc-synchronous-standby-num"> that are + connected. If there are more elements than the number of stansbys + required, all the additional standbys are potential synchronous + candidates. If <xref linkend="guc-synchronous-standby-names"> is + empty, all the standbys are asynchronous. If it is set to the + special entry <literal>*</>, a number of standbys equal to + <xref linkend="guc-synchronous-standby-names"> with the highest + pritority are elected as being synchronous. + </para> + <para> + Server will wait for commit confirmation from + <xref linkend="guc-synchronous-standby-num"> standbys, meaning that + if <xref linkend="guc-synchronous-standby-names"> has less elements + than the number of standbys required, server will wait indefinitely + for a commit confirmation. + </para> + <para> + This parameter can only be set in the <filename>postgresql.conf</> + file or on the server command line. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age"> <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>) <indexterm> *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 1081,1092 **** primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. ! If the standby is the first matching standby, as specified in ! <varname>synchronous_standby_names</> on the primary, the reply ! messages from that standby will be used to wake users waiting for ! confirmation that the commit record has been received. These parameters ! allow the administrator to specify which standby servers should be ! synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. --- 1081,1092 ---- WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless <varname>wal_receiver_status_interval</> is set to zero on the standby. ! If the standby is the first <varname>synchronous_standby_num</> matching ! standbys, as specified in <varname>synchronous_standby_names</> on the ! primary, the reply messages from that standby will be used to wake users ! waiting for confirmation that the commit record has been received. These ! parameters allow the administrator to specify which standby servers should ! be synchronous standbys. Note that the configuration of synchronous replication is mainly on the master. Named standbys must be directly connected to the master; the master knows nothing about downstream standby servers using cascaded replication. *************** *** 1169,1177 **** primary_slot_name = 'node_a_slot' The best solution for avoiding data loss is to ensure you don't lose your last remaining synchronous standby. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. ! The first named standby will be used as the synchronous standby. Standbys ! listed after this will take over the role of synchronous standby if the ! first one should fail. </para> <para> --- 1169,1177 ---- The best solution for avoiding data loss is to ensure you don't lose your last remaining synchronous standby. This can be achieved by naming multiple potential synchronous standbys using <varname>synchronous_standby_names</>. ! The first <varname>synchronous_standby_num</> named standbys will be used as ! the synchronous standbys. Standbys listed after this will take over the role ! of synchronous standby if the first one should fail. </para> <para> *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 5,11 **** * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the sync standby. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming --- 5,11 ---- * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is ! * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming *************** *** 59,64 **** --- 59,65 ---- /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; + int synchronous_standby_num = 1; #define SyncStandbysDefined() \ (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') *************** *** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; --- 207,213 ---- ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; *************** *** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); SyncRepCancelWait(); break; } --- 224,230 ---- QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), ! errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); SyncRepCancelWait(); break; } *************** *** 357,365 **** SyncRepInitConfig(void) } } /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standby-releases-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. --- 358,442 ---- } } + + /* + * Obtain a palloc'd array containing positions of stanbys currently + * considered as synchronous. Caller is responsible for freeing the + * data obtained. + * Callers of this function should as well take a necessary lock on + * SyncRepLock. + */ + int * + SyncRepGetSynchronousNodes(int *num_sync) + { + int *sync_standbys; + int priority = 0; + int i; + + /* Make enough room */ + sync_standbys = (int *) palloc(synchronous_standby_num * sizeof(int)); + + for (i = 0; i < max_wal_senders; i++) + { + /* Use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* Process to next if not active */ + if (walsnd->pid == 0) + continue; + + /* Process to next if not streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Process to next one if asynchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Process to next one if priority conditions not satisfied */ + if (priority != 0 && + priority <= walsnd->sync_standby_priority && + *num_sync == synchronous_standby_num) + continue; + + /* Process to next one if flush position is invalid */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * We have a potential synchronous candidate, add it to the + * list of nodes already present or evict the node with highest + * priority found until now. + */ + if (*num_sync == synchronous_standby_num) + { + int j; + for (j = 0; j < *num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]]; + if (walsndloc->sync_standby_priority == priority) + { + sync_standbys[j] = i; + break; + } + } + } + else + { + sync_standbys[*num_sync] = i; + (*num_sync)++; + } + + /* Update priority for next tracking */ + priority = walsnd->sync_standby_priority; + } + + return sync_standbys; + } + /* * Update the LSNs on each queue based upon our latest state. This ! * implements a simple policy of first-valid-standbys-release-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. *************** *** 368,378 **** void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! volatile WalSnd *syncWalSnd = NULL; int numwrite = 0; int numflush = 0; ! int priority = 0; int i; /* * If this WALSender is serving a standby that is not on the list of --- 445,458 ---- SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; ! int *sync_standbys; int numwrite = 0; int numflush = 0; ! int num_sync = 0; int i; + bool found = false; + XLogRecPtr min_write_pos; + XLogRecPtr min_flush_pos; /* * If this WALSender is serving a standby that is not on the list of *************** *** 388,454 **** SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standby. If you change this, also ! * change pg_stat_get_wal_senders(). */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); ! for (i = 0; i < max_wal_senders; i++) { ! /* use volatile pointer to prevent code rearrangement */ ! volatile WalSnd *walsnd = &walsndctl->walsnds[i]; ! ! if (walsnd->pid != 0 && ! walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) { ! priority = walsnd->sync_standby_priority; ! syncWalSnd = walsnd; } } /* ! * We should have found ourselves at least. */ ! Assert(syncWalSnd); /* ! * If we aren't managing the highest priority standby then just leave. */ ! if (syncWalSnd != MyWalSnd) { LWLockRelease(SyncRepLock); ! announce_next_takeover = true; return; } /* * Set the lsn first so that when we wake backends they will release up to ! * this location. */ ! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) { ! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } ! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) { ! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", ! numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, ! numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now the sync standby. */ if (announce_next_takeover) { --- 468,564 ---- /* * We're a potential sync standby. Release waiters if we are the highest * priority standby. If there are multiple standbys with same priorities ! * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); ! /* ! * We should have found ourselves at least. ! */ ! Assert(num_sync > 0); ! ! /* ! * If we aren't managing one of the standbys with highest priority ! * then just leave. ! */ ! for (i = 0; i < num_sync; i++) { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! if (walsndloc == MyWalSnd) { ! found = true; ! break; } } /* ! * We are definitely not one of the chosen... But we could by ! * taking the next takeover. */ ! if (!found) ! { ! LWLockRelease(SyncRepLock); ! pfree(sync_standbys); ! announce_next_takeover = true; ! return; ! } /* ! * Even if we are one of the chosen standbys, leave if there ! * are less synchronous standbys in waiting state than what is ! * expected by the user. */ ! if (num_sync < synchronous_standby_num) { LWLockRelease(SyncRepLock); ! pfree(sync_standbys); return; } /* * Set the lsn first so that when we wake backends they will release up to ! * this location, of course only if all the standbys found as synchronous ! * have already reached that point, so first find what are the oldest ! * write and flush positions of all the standbys considered in sync... */ ! min_write_pos = MyWalSnd->write; ! min_flush_pos = MyWalSnd->flush; ! for (i = 0; i < num_sync; i++) ! { ! volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; ! ! SpinLockAcquire(&walsndloc->mutex); ! if (min_write_pos > walsndloc->write) ! min_write_pos = walsndloc->write; ! if (min_flush_pos > walsndloc->flush) ! min_flush_pos = walsndloc->flush; ! SpinLockRelease(&walsndloc->mutex); ! } ! ! /* ... And now update if necessary */ ! if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos) { ! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } ! if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos) { ! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", ! numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32), ! (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE], ! numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32), ! (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]); /* * If we are managing the highest priority standby, though we weren't ! * prior to this, then announce we are now a sync standby. */ if (announce_next_takeover) { *************** *** 457,462 **** SyncRepReleaseWaiters(void) --- 567,575 ---- (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + pfree(sync_standbys); } /* *************** *** 694,699 **** check_synchronous_standby_names(char **newval, void **extra, GucSource source) --- 807,836 ---- return true; } + bool + check_synchronous_standby_num(int *newval, void **extra, GucSource source) + { + /* + * Default value is important for backward-compatibility, as well as + * for initialization. + */ + if (*newval == 1) + return true; + + /* + * If new value is higher than max_wal_senders, enforce it to the value of + * max_wal_senders. + */ + if (*newval > max_wal_senders) + { + GUC_check_errdetail("synchronous_standby_num cannot be higher than max_wal_senders."); + *newval = max_wal_senders; + return false; + } + + return true; + } + void assign_synchronous_commit(int newval, void *extra) { *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 2735,2742 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int priority = 0; ! int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ --- 2735,2742 ---- MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; ! int *sync_standbys; ! int num_sync = 0; int i; /* check to see if caller supports us returning a tuplestore */ *************** *** 2767,2802 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. This code must match the code in SyncRepReleaseWaiters(). */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! if (walsnd->pid != 0) ! { ! /* ! * Treat a standby such as a pg_basebackup background process ! * which always returns an invalid flush location, as an ! * asynchronous standby. ! */ ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; ! ! if (walsnd->state == WALSNDSTATE_STREAMING && ! walsnd->sync_standby_priority > 0 && ! (priority == 0 || ! priority > walsnd->sync_standby_priority) && ! !XLogRecPtrIsInvalid(walsnd->flush)) ! { ! priority = walsnd->sync_standby_priority; ! sync_standby = i; ! } ! } } LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) --- 2767,2789 ---- /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync ! * standby. */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); + + /* Get first the priorities on each standby as long as we hold a lock */ for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; ! sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? ! 0 : walsnd->sync_standby_priority; } + + /* Obtain list of synchronous standbys */ + sync_standbys = SyncRepGetSynchronousNodes(&num_sync); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) *************** *** 2858,2872 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); - else if (i == sync_standby) - values[7] = CStringGetTextDatum("sync"); else ! values[7] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); --- 2845,2874 ---- */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); else ! { ! int j; ! bool found = false; ! ! for (j = 0; j < num_sync; j++) ! { ! /* Found that this node is one in sync */ ! if (i == sync_standbys[j]) ! { ! values[7] = CStringGetTextDatum("sync"); ! found = true; ! break; ! } ! } ! if (!found) ! values[7] = CStringGetTextDatum("potential"); ! } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); + pfree(sync_standbys); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 2548,2553 **** static struct config_int ConfigureNamesInt[] = --- 2548,2563 ---- NULL, NULL, NULL }, + { + {"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Number of potential synchronous standbys."), + NULL + }, + &synchronous_standby_num, + 1, 1, INT_MAX, + check_synchronous_standby_num, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 235,240 **** --- 235,241 ---- #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all + #synchronous_standby_num = 1 # number of standbys servers using sync rep #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 33,38 **** --- 33,39 ---- /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; + extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); *************** *** 49,56 **** extern void SyncRepUpdateSyncStandbysDefined(void); --- 50,59 ---- /* called by various procs */ extern int SyncRepWakeQueue(bool all, int mode); + extern int *SyncRepGetSynchronousNodes(int *num_sync); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); + extern bool check_synchronous_standby_num(int *newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); #endif /* _SYNCREP_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers