On Thu, Aug 14, 2014 at 8:34 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
> +        At any one time there will be at a number of active
> synchronous standbys
> +        defined by <varname>synchronous_standby_num</>; transactions waiting
>
> It's better to use <xref linkend="guc-synchronous-standby-num">, instead.
Fixed.

> +        for commit will be allowed to proceed after those standby servers
> +        confirms receipt of their data. The synchronous standbys will be
>
> Typo: confirms -> confirm

Fixed.

> +       <para>
> +        Specifies the number of standbys that support
> +        <firstterm>synchronous replication</>, as described in
> +        <xref linkend="synchronous-replication">, and listed as the first
> +        elements of <xref linkend="guc-synchronous-standby-names">.
> +       </para>
> +       <para>
> +        Default value is 1.
> +       </para>
>
> synchronous_standby_num is defined with PGC_SIGHUP. So the following
> should be added into the document.
>
>     This parameter can only be set in the postgresql.conf file or on
> the server command line.
Fixed.

> The name of the parameter "synchronous_standby_num" sounds to me that
> the transaction must wait for its WAL to be replicated to s_s_num standbys.
> But that's not true in your patch. If s_s_names is empty, replication works
> asynchronously whether the value of s_s_num is. I'm afraid that it's 
> confusing.
> The description of s_s_num is not sufficient. I'm afraid that users can easily
> misunderstand that they can use quorum commit feature by using s_s_names
> and s_s_num. That is, the transaction waits for its WAL to be replicated to
> any s_s_num standbys listed in s_s_names.

I reworked the docs to mention all that. Yes things are a bit
different than any quorum commit facility (how to parametrize that
simply without a parameter mapping one to one the items of
s_s_names?), as this facility relies on the order of the items of
s_s_names and the fact that stansbys are connected at a given time.

> When s_s_num is set to larger value than max_wal_senders, we should warn that?
Actually I have done a bit more than that by forbidding setting
s_s_num to a value higher than max_wal_senders. Thoughts?

Now that we discuss the interactions with other parameters. Another
thing that I am wondering about now is: what should we do if we
specify s_s_num to a number higher than the elements in s_s_names?
Currently, the patch gives the priority to s_s_num, in short if we set
s_s_num to 100, server will wait for 100 servers to confirm commit
even if there are less than 100 elements in s_s_names. I chose this
way because it looks saner particularly if s_s_names = '*'. Thoughts
once again?

> +    for (i = 0; i < num_sync; i++)
> +    {
> +        volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
> +
> +        if (min_write_pos > walsndloc->write)
> +            min_write_pos = walsndloc->write;
> +        if (min_flush_pos > walsndloc->flush)
> +            min_flush_pos = walsndloc->flush;
> +    }
>
> I don't think that it's safe to see those shared values without spinlock.
Looking at walsender.c you are right. I have updated the code to use
the mutex lock of the walsender whose values are being read from.

Regards,
-- 
Michael

On Thu, Aug 14, 2014 at 4:34 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Wed, Aug 13, 2014 at 4:10 PM, Michael Paquier
> <michael.paqu...@gmail.com> wrote:
>> On Wed, Aug 13, 2014 at 2:10 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
>>> I sent the SIGSTOP signal to the walreceiver process in one of sync 
>>> standbys,
>>> and then ran write transactions again. In this case, they must not be 
>>> completed
>>> because their WAL cannot be replicated to the standby that its walreceiver
>>> was stopped. But they were successfully completed.
>>
>> At the end of SyncRepReleaseWaiters, SYNC_REP_WAIT_WRITE and
>> SYNC_REP_WAIT_FLUSH in walsndctl were able to update with only one wal
>> sender in sync, making backends wake up even if other standbys did not
>> catch up. But we need to scan all the synchronous wal senders and find
>> the minimum write and flush positions and update walsndctl with those
>> values. Well that's a code path I forgot to cover.
>>
>> Attached is an updated patch fixing the problem you reported.
>
> +        At any one time there will be at a number of active
> synchronous standbys
> +        defined by <varname>synchronous_standby_num</>; transactions waiting
>
> It's better to use <xref linkend="guc-synchronous-standby-num">, instead.
>
> +        for commit will be allowed to proceed after those standby servers
> +        confirms receipt of their data. The synchronous standbys will be
>
> Typo: confirms -> confirm
>
> +       <para>
> +        Specifies the number of standbys that support
> +        <firstterm>synchronous replication</>, as described in
> +        <xref linkend="synchronous-replication">, and listed as the first
> +        elements of <xref linkend="guc-synchronous-standby-names">.
> +       </para>
> +       <para>
> +        Default value is 1.
> +       </para>
>
> synchronous_standby_num is defined with PGC_SIGHUP. So the following
> should be added into the document.
>
>     This parameter can only be set in the postgresql.conf file or on
> the server command line.
>
> The name of the parameter "synchronous_standby_num" sounds to me that
> the transaction must wait for its WAL to be replicated to s_s_num standbys.
> But that's not true in your patch. If s_s_names is empty, replication works
> asynchronously whether the value of s_s_num is. I'm afraid that it's 
> confusing.
>
> The description of s_s_num is not sufficient. I'm afraid that users can easily
> misunderstand that they can use quorum commit feature by using s_s_names
> and s_s_num. That is, the transaction waits for its WAL to be replicated to
> any s_s_num standbys listed in s_s_names.
>
> When s_s_num is set to larger value than max_wal_senders, we should warn that?
>
> +    for (i = 0; i < num_sync; i++)
> +    {
> +        volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
> +
> +        if (min_write_pos > walsndloc->write)
> +            min_write_pos = walsndloc->write;
> +        if (min_flush_pos > walsndloc->flush)
> +            min_flush_pos = walsndloc->flush;
> +    }
>
> I don't think that it's safe to see those shared values without spinlock.
>
> Regards,
>
> --
> Fujii Masao



-- 
Michael
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2586,2597 **** include_dir 'conf.d'
          Specifies a comma-separated list of standby names that can support
          <firstterm>synchronous replication</>, as described in
          <xref linkend="synchronous-replication">.
!         At any one time there will be at most one active synchronous standby;
!         transactions waiting for commit will be allowed to proceed after
!         this standby server confirms receipt of their data.
!         The synchronous standby will be the first standby named in this list
!         that is both currently connected and streaming data in real-time
!         (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
          Other standby servers appearing later in this list represent potential
--- 2586,2598 ----
          Specifies a comma-separated list of standby names that can support
          <firstterm>synchronous replication</>, as described in
          <xref linkend="synchronous-replication">.
!         At any one time there will be at a number of active synchronous standbys
!         defined by <xref linkend="guc-synchronous-standby-num">, transactions
!         waiting for commit will be allowed to proceed after those standby
!         servers confirm receipt of their data. The synchronous standbys will be
!         the first entries named in this list that are both currently connected
!         and streaming data in real-time (as shown by a state of
!         <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
          Other standby servers appearing later in this list represent potential
***************
*** 2627,2632 **** include_dir 'conf.d'
--- 2628,2674 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num">
+       <term><varname>synchronous_standby_num</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>synchronous_standby_num</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Specifies the number of standbys that support
+         <firstterm>synchronous replication</>.
+        </para>
+        <para>
+         Default value is 1. This parameter value cannot be higher than
+         <xref linkend="guc-max-wal-senders">.
+        </para>
+        <para>
+         Are considered as synchronous the first elements of
+         <xref linkend="guc-synchronous-standby-names"> in number of
+         <xref linkend="guc-synchronous-standby-num"> that are
+         connected. If there are more elements than the number of stansbys
+         required, all the additional standbys are potential synchronous
+         candidates. If <xref linkend="guc-synchronous-standby-names"> is
+         empty, all the standbys are asynchronous. If it is set to the
+         special entry <literal>*</>, a number of standbys equal to
+         <xref linkend="guc-synchronous-standby-names"> with the highest
+         pritority are elected as being synchronous.
+        </para>
+        <para>
+         Server will wait for commit confirmation from
+         <xref linkend="guc-synchronous-standby-num"> standbys, meaning that
+         if <xref linkend="guc-synchronous-standby-names"> has less elements
+         than the number of standbys required, server will wait indefinitely
+         for a commit confirmation.
+        </para>
+        <para>
+         This parameter can only be set in the <filename>postgresql.conf</>
+         file or on the server command line.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
        <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)
        <indexterm>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1081,1092 **** primary_slot_name = 'node_a_slot'
      WAL record is then sent to the standby. The standby sends reply
      messages each time a new batch of WAL data is written to disk, unless
      <varname>wal_receiver_status_interval</> is set to zero on the standby.
!     If the standby is the first matching standby, as specified in
!     <varname>synchronous_standby_names</> on the primary, the reply
!     messages from that standby will be used to wake users waiting for
!     confirmation that the commit record has been received. These parameters
!     allow the administrator to specify which standby servers should be
!     synchronous standbys. Note that the configuration of synchronous
      replication is mainly on the master. Named standbys must be directly
      connected to the master; the master knows nothing about downstream
      standby servers using cascaded replication.
--- 1081,1092 ----
      WAL record is then sent to the standby. The standby sends reply
      messages each time a new batch of WAL data is written to disk, unless
      <varname>wal_receiver_status_interval</> is set to zero on the standby.
!     If the standby is the first <varname>synchronous_standby_num</> matching
!     standbys, as specified in <varname>synchronous_standby_names</> on the
!     primary, the reply messages from that standby will be used to wake users
!     waiting for confirmation that the commit record has been received. These
!     parameters allow the administrator to specify which standby servers should
!     be synchronous standbys. Note that the configuration of synchronous
      replication is mainly on the master. Named standbys must be directly
      connected to the master; the master knows nothing about downstream
      standby servers using cascaded replication.
***************
*** 1169,1177 **** primary_slot_name = 'node_a_slot'
      The best solution for avoiding data loss is to ensure you don't lose
      your last remaining synchronous standby. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
!     The first named standby will be used as the synchronous standby. Standbys
!     listed after this will take over the role of synchronous standby if the
!     first one should fail.
     </para>
  
     <para>
--- 1169,1177 ----
      The best solution for avoiding data loss is to ensure you don't lose
      your last remaining synchronous standby. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
!     The first <varname>synchronous_standby_num</> named standbys will be used as
!     the synchronous standbys. Standbys listed after this will take over the role
!     of synchronous standby if the first one should fail.
     </para>
  
     <para>
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 5,11 ****
   * Synchronous replication is new as of PostgreSQL 9.1.
   *
   * If requested, transaction commits wait until their commit LSN is
!  * acknowledged by the sync standby.
   *
   * This module contains the code for waiting and release of backends.
   * All code in this module executes on the primary. The core streaming
--- 5,11 ----
   * Synchronous replication is new as of PostgreSQL 9.1.
   *
   * If requested, transaction commits wait until their commit LSN is
!  * acknowledged by the synchronous standbys.
   *
   * This module contains the code for waiting and release of backends.
   * All code in this module executes on the primary. The core streaming
***************
*** 59,64 ****
--- 59,65 ----
  
  /* User-settable parameters for sync rep */
  char	   *SyncRepStandbyNames;
+ int			synchronous_standby_num = 1;
  
  #define SyncStandbysDefined() \
  	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
***************
*** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  			ereport(WARNING,
  					(errcode(ERRCODE_ADMIN_SHUTDOWN),
  					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
  			whereToSendOutput = DestNone;
  			SyncRepCancelWait();
  			break;
--- 207,213 ----
  			ereport(WARNING,
  					(errcode(ERRCODE_ADMIN_SHUTDOWN),
  					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
  			whereToSendOutput = DestNone;
  			SyncRepCancelWait();
  			break;
***************
*** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  			QueryCancelPending = false;
  			ereport(WARNING,
  					(errmsg("canceling wait for synchronous replication due to user request"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
  			SyncRepCancelWait();
  			break;
  		}
--- 224,230 ----
  			QueryCancelPending = false;
  			ereport(WARNING,
  					(errmsg("canceling wait for synchronous replication due to user request"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
  			SyncRepCancelWait();
  			break;
  		}
***************
*** 357,365 **** SyncRepInitConfig(void)
  	}
  }
  
  /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-standby-releases-waiter.
   *
   * Other policies are possible, which would change what we do here and what
   * perhaps also which information we store as well.
--- 358,442 ----
  	}
  }
  
+ 
+ /*
+  * Obtain a palloc'd array containing positions of stanbys currently
+  * considered as synchronous. Caller is responsible for freeing the
+  * data obtained.
+  * Callers of this function should as well take a necessary lock on
+  * SyncRepLock.
+  */
+ int *
+ SyncRepGetSynchronousNodes(int *num_sync)
+ {
+ 	int	   *sync_standbys;
+ 	int		priority = 0;
+ 	int		i;
+ 
+ 	/* Make enough room */
+ 	sync_standbys = (int *) palloc(synchronous_standby_num * sizeof(int));
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		/* Use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		/* Process to next if not active */
+ 		if (walsnd->pid == 0)
+ 			continue;
+ 
+ 		/* Process to next if not streaming */
+ 		if (walsnd->state != WALSNDSTATE_STREAMING)
+ 			continue;
+ 
+ 		/* Process to next one if asynchronous */
+ 		if (walsnd->sync_standby_priority == 0)
+ 			continue;
+ 
+ 		/* Process to next one if priority conditions not satisfied */
+ 		if (priority != 0 &&
+ 			priority <= walsnd->sync_standby_priority &&
+ 			*num_sync == synchronous_standby_num)
+ 			continue;
+ 
+ 		/* Process to next one if flush position is invalid */
+ 		if (XLogRecPtrIsInvalid(walsnd->flush))
+ 			continue;
+ 
+ 		/*
+ 		 * We have a potential synchronous candidate, add it to the
+ 		 * list of nodes already present or evict the node with highest
+ 		 * priority found until now.
+ 		 */
+ 		if (*num_sync == synchronous_standby_num)
+ 		{
+ 			int j;
+ 			for (j = 0; j < *num_sync; j++)
+ 			{
+ 				volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]];
+ 				if (walsndloc->sync_standby_priority == priority)
+ 				{
+ 					sync_standbys[j] = i;
+ 					break;
+ 				}
+ 			}
+ 		}
+ 		else
+ 		{
+ 			sync_standbys[*num_sync] = i;
+ 			(*num_sync)++;
+ 		}
+ 
+ 		/* Update priority for next tracking */
+ 		priority = walsnd->sync_standby_priority;
+ 	}
+ 
+ 	return sync_standbys;
+ }
+ 
  /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-standbys-release-waiter.
   *
   * Other policies are possible, which would change what we do here and what
   * perhaps also which information we store as well.
***************
*** 368,378 **** void
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	volatile WalSnd *syncWalSnd = NULL;
  	int			numwrite = 0;
  	int			numflush = 0;
! 	int			priority = 0;
  	int			i;
  
  	/*
  	 * If this WALSender is serving a standby that is not on the list of
--- 445,458 ----
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	int		   *sync_standbys;
  	int			numwrite = 0;
  	int			numflush = 0;
! 	int			num_sync = 0;
  	int			i;
+ 	bool		found = false;
+ 	XLogRecPtr	min_write_pos;
+ 	XLogRecPtr	min_flush_pos;
  
  	/*
  	 * If this WALSender is serving a standby that is not on the list of
***************
*** 388,454 **** SyncRepReleaseWaiters(void)
  	/*
  	 * We're a potential sync standby. Release waiters if we are the highest
  	 * priority standby. If there are multiple standbys with same priorities
! 	 * then we use the first mentioned standby. If you change this, also
! 	 * change pg_stat_get_wal_senders().
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  
! 	for (i = 0; i < max_wal_senders; i++)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
! 
! 		if (walsnd->pid != 0 &&
! 			walsnd->state == WALSNDSTATE_STREAMING &&
! 			walsnd->sync_standby_priority > 0 &&
! 			(priority == 0 ||
! 			 priority > walsnd->sync_standby_priority) &&
! 			!XLogRecPtrIsInvalid(walsnd->flush))
  		{
! 			priority = walsnd->sync_standby_priority;
! 			syncWalSnd = walsnd;
  		}
  	}
  
  	/*
! 	 * We should have found ourselves at least.
  	 */
! 	Assert(syncWalSnd);
  
  	/*
! 	 * If we aren't managing the highest priority standby then just leave.
  	 */
! 	if (syncWalSnd != MyWalSnd)
  	{
  		LWLockRelease(SyncRepLock);
! 		announce_next_takeover = true;
  		return;
  	}
  
  	/*
  	 * Set the lsn first so that when we wake backends they will release up to
! 	 * this location.
  	 */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
  
  	LWLockRelease(SyncRepLock);
  
  	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
! 	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
  
  	/*
  	 * If we are managing the highest priority standby, though we weren't
! 	 * prior to this, then announce we are now the sync standby.
  	 */
  	if (announce_next_takeover)
  	{
--- 468,564 ----
  	/*
  	 * We're a potential sync standby. Release waiters if we are the highest
  	 * priority standby. If there are multiple standbys with same priorities
! 	 * then we use the first mentioned standbys.
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 	sync_standbys = SyncRepGetSynchronousNodes(&num_sync);
  
! 	/*
! 	 * We should have found ourselves at least.
! 	 */
! 	Assert(num_sync > 0);
! 
! 	/*
! 	 * If we aren't managing one of the standbys with highest priority
! 	 * then just leave.
! 	 */
! 	for (i = 0; i < num_sync; i++)
  	{
! 		volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
! 		if (walsndloc == MyWalSnd)
  		{
! 			found = true;
! 			break;
  		}
  	}
  
  	/*
! 	 * We are definitely not one of the chosen... But we could by
! 	 * taking the next takeover.
  	 */
! 	if (!found)
! 	{
! 		LWLockRelease(SyncRepLock);
! 		pfree(sync_standbys);
! 		announce_next_takeover = true;
! 		return;
! 	}
  
  	/*
! 	 * Even if we are one of the chosen standbys, leave if there
! 	 * are less synchronous standbys in waiting state than what is
! 	 * expected by the user.
  	 */
! 	if (num_sync < synchronous_standby_num)
  	{
  		LWLockRelease(SyncRepLock);
! 		pfree(sync_standbys);
  		return;
  	}
  
  	/*
  	 * Set the lsn first so that when we wake backends they will release up to
! 	 * this location, of course only if all the standbys found as synchronous
! 	 * have already reached that point, so first find what are the oldest
! 	 * write and flush positions of all the standbys considered in sync...
  	 */
! 	min_write_pos = MyWalSnd->write;
! 	min_flush_pos = MyWalSnd->flush;
! 	for (i = 0; i < num_sync; i++)
! 	{
! 		volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
! 
! 		SpinLockAcquire(&walsndloc->mutex);
! 		if (min_write_pos > walsndloc->write)
! 			min_write_pos = walsndloc->write;
! 		if (min_flush_pos > walsndloc->flush)
! 			min_flush_pos = walsndloc->flush;
! 		SpinLockRelease(&walsndloc->mutex);
! 	}
! 
! 	/* ... And now update if necessary */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
  
  	LWLockRelease(SyncRepLock);
  
  	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! 		 numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32),
! 		 (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE],
! 		 numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32),
! 		 (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]);
  
  	/*
  	 * If we are managing the highest priority standby, though we weren't
! 	 * prior to this, then announce we are now a sync standby.
  	 */
  	if (announce_next_takeover)
  	{
***************
*** 457,462 **** SyncRepReleaseWaiters(void)
--- 567,575 ----
  				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
  						application_name, MyWalSnd->sync_standby_priority)));
  	}
+ 
+ 	/* Clean up */
+ 	pfree(sync_standbys);
  }
  
  /*
***************
*** 694,699 **** check_synchronous_standby_names(char **newval, void **extra, GucSource source)
--- 807,836 ----
  	return true;
  }
  
+ bool
+ check_synchronous_standby_num(int *newval, void **extra, GucSource source)
+ {
+ 	/*
+ 	 * Default value is important for backward-compatibility, as well as
+ 	 * for initialization.
+ 	 */
+ 	if (*newval == 1)
+ 		return true;
+ 
+ 	/*
+ 	 * If new value is higher than max_wal_senders, enforce it to the value of
+ 	 * max_wal_senders.
+ 	 */
+ 	if (*newval > max_wal_senders)
+ 	{
+ 		GUC_check_errdetail("synchronous_standby_num cannot be higher than max_wal_senders.");
+ 		*newval = max_wal_senders;
+ 		return false;
+ 	}
+ 
+ 	return true;
+ }
+ 
  void
  assign_synchronous_commit(int newval, void *extra)
  {
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2735,2742 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
  	int		   *sync_priority;
! 	int			priority = 0;
! 	int			sync_standby = -1;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
--- 2735,2742 ----
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
  	int		   *sync_priority;
! 	int		   *sync_standbys;
! 	int			num_sync = 0;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
***************
*** 2767,2802 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	/*
  	 * Get the priorities of sync standbys all in one go, to minimise lock
  	 * acquisitions and to allow us to evaluate who is the current sync
! 	 * standby. This code must match the code in SyncRepReleaseWaiters().
  	 */
  	sync_priority = palloc(sizeof(int) * max_wal_senders);
  	LWLockAcquire(SyncRepLock, LW_SHARED);
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
  
! 		if (walsnd->pid != 0)
! 		{
! 			/*
! 			 * Treat a standby such as a pg_basebackup background process
! 			 * which always returns an invalid flush location, as an
! 			 * asynchronous standby.
! 			 */
! 			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
! 				0 : walsnd->sync_standby_priority;
! 
! 			if (walsnd->state == WALSNDSTATE_STREAMING &&
! 				walsnd->sync_standby_priority > 0 &&
! 				(priority == 0 ||
! 				 priority > walsnd->sync_standby_priority) &&
! 				!XLogRecPtrIsInvalid(walsnd->flush))
! 			{
! 				priority = walsnd->sync_standby_priority;
! 				sync_standby = i;
! 			}
! 		}
  	}
  	LWLockRelease(SyncRepLock);
  
  	for (i = 0; i < max_wal_senders; i++)
--- 2767,2789 ----
  	/*
  	 * Get the priorities of sync standbys all in one go, to minimise lock
  	 * acquisitions and to allow us to evaluate who is the current sync
! 	 * standby.
  	 */
  	sync_priority = palloc(sizeof(int) * max_wal_senders);
  	LWLockAcquire(SyncRepLock, LW_SHARED);
+ 
+ 	/* Get first the priorities on each standby as long as we hold a lock */
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
  
! 		sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
! 			0 : walsnd->sync_standby_priority;
  	}
+ 
+ 	/* Obtain list of synchronous standbys */
+ 	sync_standbys = SyncRepGetSynchronousNodes(&num_sync);
  	LWLockRelease(SyncRepLock);
  
  	for (i = 0; i < max_wal_senders; i++)
***************
*** 2858,2872 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  			 */
  			if (sync_priority[i] == 0)
  				values[7] = CStringGetTextDatum("async");
- 			else if (i == sync_standby)
- 				values[7] = CStringGetTextDatum("sync");
  			else
! 				values[7] = CStringGetTextDatum("potential");
  		}
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
  	pfree(sync_priority);
  
  	/* clean up and return the tuplestore */
  	tuplestore_donestoring(tupstore);
--- 2845,2874 ----
  			 */
  			if (sync_priority[i] == 0)
  				values[7] = CStringGetTextDatum("async");
  			else
! 			{
! 				int j;
! 				bool found = false;
! 
! 				for (j = 0; j < num_sync; j++)
! 				{
! 					/* Found that this node is one in sync */
! 					if (i == sync_standbys[j])
! 					{
! 						values[7] = CStringGetTextDatum("sync");
! 						found = true;
! 						break;
! 					}
! 				}
! 				if (!found)
! 					values[7] = CStringGetTextDatum("potential");
! 			}
  		}
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
  	pfree(sync_priority);
+ 	pfree(sync_standbys);
  
  	/* clean up and return the tuplestore */
  	tuplestore_donestoring(tupstore);
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2548,2553 **** static struct config_int ConfigureNamesInt[] =
--- 2548,2563 ----
  		NULL, NULL, NULL
  	},
  
+ 	{
+ 		{"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER,
+ 			gettext_noop("Number of potential synchronous standbys."),
+ 			NULL
+ 		},
+ 		&synchronous_standby_num,
+ 		1, 1, INT_MAX,
+ 		check_synchronous_standby_num, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 235,240 ****
--- 235,241 ----
  #synchronous_standby_names = ''	# standby servers that provide sync rep
  				# comma-separated list of application_name
  				# from standby(s); '*' = all
+ #synchronous_standby_num = 1	# number of standbys servers using sync rep
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
  
  # - Standby Servers -
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 33,38 ****
--- 33,39 ----
  
  /* user-settable parameters for synchronous replication */
  extern char *SyncRepStandbyNames;
+ extern int	synchronous_standby_num;
  
  /* called by user backend */
  extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
***************
*** 49,56 **** extern void SyncRepUpdateSyncStandbysDefined(void);
--- 50,59 ----
  
  /* called by various procs */
  extern int	SyncRepWakeQueue(bool all, int mode);
+ extern int *SyncRepGetSynchronousNodes(int *num_sync);
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+ extern bool check_synchronous_standby_num(int *newval, void **extra, GucSource source);
  extern void assign_synchronous_commit(int newval, void *extra);
  
  #endif   /* _SYNCREP_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to