On Wed, Aug 27, 2014 at 2:46 PM, Rajeev rastogi
<rajeev.rast...@huawei.com> wrote:
> I have done some more review, below are my comments:
Thanks!

> 1. There are currently two loops on *num_sync, Can we simplify the function 
> SyncRepGetSynchronousNodes by moving the priority calculation inside the 
> upper loop
>                 if (*num_sync == allowed_sync_nodes)
>                 {
>                         for (j = 0; j < *num_sync; j++)
>                         {
>         Anyway we require priority only if *num_sync == allowed_sync_nodes 
> condition matches.
>         So in this loop itself, we can calculate the priority as well as 
> assigment of new standbys with lower priority.
>         Let me know if you see any issue with this.

OK, I see, yes this can minimize process a bit so I refactored the
code by integrating the second loop to the first. This has needed the
removal of the break portion as we need to find the highest priority
value among the nodes already determined as synchronous.

> 2.      Comment inside the function SyncRepReleaseWaiters,
>         /*
>          * We should have found ourselves at least, except if it is not 
> expected
>          * to find any synchronous nodes.
>          */
>         Assert(num_sync > 0);
>
>         I think "except if it is not expected to find any synchronous nodes" 
> is not required.
>         As if it has come till this point means at least this node is 
> synchronous.
Yes, removed.

> 3.      Document says that s_s_num should be lesser than max_wal_senders but 
> code wise there is no protection for the same.
>         IMHO, s_s_num should be lesser than or equal to max_wal_senders 
> otherwise COMMIT will never return back the console without
>         any knowledge of user.
>         I see that some discussion has happened regarding this but I think 
> just adding documentation for this is not enough.
>         I am not sure what issue is observed in adding check during GUC 
> initialization but if there is unavoidable issue during GUC initialization 
> then can't we try to add check at later points.

The trick here is that you cannot really return a warning at GUC
loading level to the user as a warning could be easily triggered if
for example s_s_num is present before max_wal_senders in
postgresql.conf. I am open to any solutions if there are any (like an
error when initializing WAL senders?!). Documentation seems enough for
me to warn the user.

> 4.  Similary interaction between parameters s_s_names and s_s_num. I see some 
> discussion has happened regarding this and it is acceptable
>         to have s_s_num more than s_s_names. But I was thinking should not 
> give atleast some notice message to user for such case along with
>         some documentation.

Done. I added the following in the paragraph "Server will wait":
Hence it is recommended to not set <varname>synchronous_standby_num</>
to a value higher than the number of elements in
<varname>synchronous_standby_names</>.

> 5. "At any one time there will be at a number of active synchronous 
> standbys": this sentence is not proper.
What about that:
"At any one time there can be a number of active synchronous standbys
up to the number defined by <xref
linkend="guc-synchronous-standby-num">"

> 6.      When this parameter is set to <literal>0</>, all the standby
>         nodes will be considered as asynchronous.
>
>         Can we make this as
>         When this parameter is set to <literal>0</>, all the standby
>         nodes will be considered as asynchronous irrespective of value of 
> synchronous_standby_names.

Done. This seems proper for the user as we do not care at all about
s_s_names if _num = 0.

> 7.      Are considered as synchronous the first elements of
>         <xref linkend="guc-synchronous-standby-names"> in number of
>         <xref linkend="guc-synchronous-standby-num"> that are
>         connected.
>
>         Starting of this sentence looks to be incomplete.
OK, I reworked this part as well. I hope it is clearer.

> 8.  Standbys listed after this will take over the role
>     of synchronous standby if the first one should fail.
>
>                 Should not we make it as:
>
>         Standbys listed after this will take over the role
>     of synchronous standby if any of the first synchronous-standby-num 
> standby fails.
Fixed as proposed.

At the same I found a bug with pg_stat_get_wal_senders caused by a
NULL pointer that was freed when s_s_num = 0. Updated patch addressing
comments is attached. On top of that the documentation has been
reworked a bit by replacing the too-high amount of <xref> blocks by
<varname>, having a link to a given variable specified only once.
Regards,
-- 
Michael
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2585,2597 **** include_dir 'conf.d'
         <para>
          Specifies a comma-separated list of standby names that can support
          <firstterm>synchronous replication</>, as described in
!         <xref linkend="synchronous-replication">.
!         At any one time there will be at most one active synchronous standby;
!         transactions waiting for commit will be allowed to proceed after
!         this standby server confirms receipt of their data.
!         The synchronous standby will be the first standby named in this list
!         that is both currently connected and streaming data in real-time
!         (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
          Other standby servers appearing later in this list represent potential
--- 2585,2598 ----
         <para>
          Specifies a comma-separated list of standby names that can support
          <firstterm>synchronous replication</>, as described in
!         <xref linkend="synchronous-replication">. At any time there can be
!         a number of active synchronous standbys up to the number
!         defined by <xref linkend="guc-synchronous-standby-num">, transactions
!         waiting for commit will be allowed to proceed after those standby
!         servers confirm receipt of their data. The synchronous standbys will be
!         the first entries named in this list that are both currently connected
!         and streaming data in real-time (as shown by a state of
!         <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
          Other standby servers appearing later in this list represent potential
***************
*** 2627,2632 **** include_dir 'conf.d'
--- 2628,2688 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num">
+       <term><varname>synchronous_standby_num</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>synchronous_standby_num</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Specifies the number of standbys that support
+         <firstterm>synchronous replication</>.
+        </para>
+        <para>
+         Default value is <literal>-1</>. In this case, if
+         <xref linkend="guc-synchronous-standby-names"> is empty all the
+         standby nodes are considered asynchronous. If there is at least
+         one node name defined, process will wait for one synchronous
+         standby listed.
+        </para>
+        <para>
+         When this parameter is set to <literal>0</>, all the standby
+         nodes will be considered as asynchronous irrespective of value
+         of <varname>synchronous_standby_names</>.
+        </para>
+        <para>
+        This parameter value cannot be higher than
+         <xref linkend="guc-max-wal-senders">.
+        </para>
+        <para>
+         Up to the first <varname>synchronous_standby_num</>
+         stanbys listed in <varname>synchronous_standby_names</>
+         that are connected to a root node at the same time can be
+         synchronous. If there are more elements than the number of standbys
+         required, all the additional standbys are potential synchronous
+         candidates. If <varname>synchronous_standby_names</> is
+         empty, all the standbys are asynchronous. If it is set to the
+         special entry <literal>*</>, a number of standbys up to
+         <varname>synchronous_standby_names</> with the highest
+         pritority are elected as being synchronous.
+        </para>
+        <para>
+         Server will wait for commit confirmation from
+         <varname>synchronous_standby_num</> standbys, meaning that
+         if <varname>synchronous_standby_names</> has less elements
+         than the number of standbys required, server will wait indefinitely
+         for a commit confirmation. Hence it is recommended to not set
+         <varname>synchronous_standby_num</> to a value higher than the
+         number of elements in <varname>synchronous_standby_names</>.
+        </para>
+        <para>
+         This parameter can only be set in the <filename>postgresql.conf</>
+         file or on the server command line.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
        <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)
        <indexterm>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1081,1092 **** primary_slot_name = 'node_a_slot'
      WAL record is then sent to the standby. The standby sends reply
      messages each time a new batch of WAL data is written to disk, unless
      <varname>wal_receiver_status_interval</> is set to zero on the standby.
!     If the standby is the first matching standby, as specified in
!     <varname>synchronous_standby_names</> on the primary, the reply
!     messages from that standby will be used to wake users waiting for
!     confirmation that the commit record has been received. These parameters
!     allow the administrator to specify which standby servers should be
!     synchronous standbys. Note that the configuration of synchronous
      replication is mainly on the master. Named standbys must be directly
      connected to the master; the master knows nothing about downstream
      standby servers using cascaded replication.
--- 1081,1092 ----
      WAL record is then sent to the standby. The standby sends reply
      messages each time a new batch of WAL data is written to disk, unless
      <varname>wal_receiver_status_interval</> is set to zero on the standby.
!     If the standby is the first <xref linkend="guc-synchronous-standby-num">
!     matching standbys, as specified in <varname>synchronous_standby_names</>
!     on the primary, the reply messages from that standby will be used to wake
!     users waiting for confirmation that the commit records has been received.
!     These parameters allow the administrator to specify which standby servers
!     should be synchronous standbys. Note that the configuration of synchronous
      replication is mainly on the master. Named standbys must be directly
      connected to the master; the master knows nothing about downstream
      standby servers using cascaded replication.
***************
*** 1167,1177 **** primary_slot_name = 'node_a_slot'
  
     <para>
      The best solution for avoiding data loss is to ensure you don't lose
!     your last remaining synchronous standby. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
!     The first named standby will be used as the synchronous standby. Standbys
!     listed after this will take over the role of synchronous standby if the
!     first one should fail.
     </para>
  
     <para>
--- 1167,1178 ----
  
     <para>
      The best solution for avoiding data loss is to ensure you don't lose
!     your last remaining synchronous standbys. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
!     The first <varname>synchronous_standby_num</> named standbys will be used as
!     the synchronous standbys. Standbys listed after this will take over the role
!     of synchronous standby if any of the first <varname>synchronous_standby_num</>
!     standby fail.
     </para>
  
     <para>
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 5,11 ****
   * Synchronous replication is new as of PostgreSQL 9.1.
   *
   * If requested, transaction commits wait until their commit LSN is
!  * acknowledged by the sync standby.
   *
   * This module contains the code for waiting and release of backends.
   * All code in this module executes on the primary. The core streaming
--- 5,11 ----
   * Synchronous replication is new as of PostgreSQL 9.1.
   *
   * If requested, transaction commits wait until their commit LSN is
!  * acknowledged by the synchronous standbys.
   *
   * This module contains the code for waiting and release of backends.
   * All code in this module executes on the primary. The core streaming
***************
*** 29,39 ****
   * single ordered queue of waiting backends, so that we can avoid
   * searching the through all waiters each time we receive a reply.
   *
!  * In 9.1 we support only a single synchronous standby, chosen from a
!  * priority list of synchronous_standby_names. Before it can become the
!  * synchronous standby it must have caught up with the primary; that may
!  * take some time. Once caught up, the current highest priority standby
!  * will release waiters from the queue.
   *
   * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
   *
--- 29,50 ----
   * single ordered queue of waiting backends, so that we can avoid
   * searching the through all waiters each time we receive a reply.
   *
!  * In 9.4 we support the possibility to have multiple synchronous standbys,
!  * whose number is defined by synchronous_standby_num, chosen from a
!  * priority list of synchronous_standby_names. Before one standby can
!  * become a synchronous standby it must have caught up with the primary;
!  * that may take some time.
!  *
!  * Waiters will be released from the queue once the number of standbys
!  * defined by synchronous_standby_num have caught.
!  *
!  * There are special cases though. If synchronous_standby_num is set to 0,
!  * all the nodes are considered as asynchronous and fastpath is out to
!  * leave this portion of the code as soon as possible. If it is set to
!  * -1, process will wait for one node to catch up with the primary only
!  * if synchronous_standby_names is non-empty. This is compatible with
!  * what has been defined in 9.1 as -1 is the default value of
!  * synchronous_standby_num.
   *
   * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
   *
***************
*** 59,67 ****
  
  /* User-settable parameters for sync rep */
  char	   *SyncRepStandbyNames;
  
  #define SyncStandbysDefined() \
! 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
  
  static bool announce_next_takeover = true;
  
--- 70,87 ----
  
  /* User-settable parameters for sync rep */
  char	   *SyncRepStandbyNames;
+ int			synchronous_standby_num = -1;
  
+ /*
+  * Synchronous standbys are defined if there is more than
+  * one synchronous standby wanted. In default case, the list
+  * of standbys defined needs to be not empty.
+  */
  #define SyncStandbysDefined() \
! 	(synchronous_standby_num > 0 || \
! 	 (synchronous_standby_num == -1 && \
! 	  SyncRepStandbyNames != NULL && \
! 	  SyncRepStandbyNames[0] != '\0'))
  
  static bool announce_next_takeover = true;
  
***************
*** 206,212 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  			ereport(WARNING,
  					(errcode(ERRCODE_ADMIN_SHUTDOWN),
  					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
  			whereToSendOutput = DestNone;
  			SyncRepCancelWait();
  			break;
--- 226,232 ----
  			ereport(WARNING,
  					(errcode(ERRCODE_ADMIN_SHUTDOWN),
  					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
  			whereToSendOutput = DestNone;
  			SyncRepCancelWait();
  			break;
***************
*** 223,229 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
  			QueryCancelPending = false;
  			ereport(WARNING,
  					(errmsg("canceling wait for synchronous replication due to user request"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
  			SyncRepCancelWait();
  			break;
  		}
--- 243,249 ----
  			QueryCancelPending = false;
  			ereport(WARNING,
  					(errmsg("canceling wait for synchronous replication due to user request"),
! 					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
  			SyncRepCancelWait();
  			break;
  		}
***************
*** 357,365 **** SyncRepInitConfig(void)
  	}
  }
  
  /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-standby-releases-waiter.
   *
   * Other policies are possible, which would change what we do here and what
   * perhaps also which information we store as well.
--- 377,493 ----
  	}
  }
  
+ 
+ /*
+  * Obtain a palloc'd array containing positions of standbys currently
+  * considered as synchronous. Caller is responsible for freeing the
+  * data obtained and should as well take a necessary lock on SyncRepLock.
+  */
+ int *
+ SyncRepGetSynchronousNodes(int *num_sync)
+ {
+ 	int	   *sync_nodes;
+ 	int		priority = 0;
+ 	int		i;
+ 	int		allowed_sync_nodes = synchronous_standby_num;
+ 
+ 	/* Initialize */
+ 	*num_sync = 0;
+ 
+ 	/* Leave if no synchronous nodes allowed */
+ 	if (synchronous_standby_num == 0)
+ 		return NULL;
+ 
+ 	/*
+ 	 * Determine the number of nodes that can be synchronized.
+ 	 * synchronous_standby_num can have the special value -1,
+ 	 * meaning that only one node with the highest non-null priority
+ 	 * can be considered as synchronous.
+ 	 */
+ 	if (synchronous_standby_num == -1)
+ 		allowed_sync_nodes = 1;
+ 
+ 	/*
+ 	 * Make enough room, there is a maximum of max_wal_senders synchronous
+ 	 * nodes as we scan though WAL senders here.
+ 	 */
+ 	sync_nodes = (int *) palloc(allowed_sync_nodes * sizeof(int));
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		/* Use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 		int j;
+ 
+ 		/* Process to next if not active */
+ 		if (walsnd->pid == 0)
+ 			continue;
+ 
+ 		/* Process to next if not streaming */
+ 		if (walsnd->state != WALSNDSTATE_STREAMING)
+ 			continue;
+ 
+ 		/* Process to next one if asynchronous */
+ 		if (walsnd->sync_standby_priority == 0)
+ 			continue;
+ 
+ 		/* Process to next one if priority conditions not satisfied */
+ 		if (priority != 0 &&
+ 			priority <= walsnd->sync_standby_priority &&
+ 			*num_sync == allowed_sync_nodes)
+ 			continue;
+ 
+ 		/* Process to next one if flush position is invalid */
+ 		if (XLogRecPtrIsInvalid(walsnd->flush))
+ 			continue;
+ 
+ 		/*
+ 		 * We have a potential synchronous candidate, add it to the
+ 		 * list of nodes already present or evict the node with highest
+ 		 * priority found until now. Track as well the highest priority
+ 		 * value in all the existing items, this helps in determining
+ 		 * what would be a standby to evict from the result array.
+ 		 */
+ 		if (*num_sync == allowed_sync_nodes)
+ 		{
+ 			int new_priority = 0;
+ 
+ 			for (j = 0; j < *num_sync; j++)
+ 			{
+ 				volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_nodes[j]];
+ 
+ 				/*
+ 				 * Note that we cannot leave now as we need to still
+ 				 * find what is the highest priority in the set of
+ 				 * synchronous standbys.
+ 				 */
+ 				if (walsndloc->sync_standby_priority == priority)
+ 					sync_nodes[j] = i;
+ 
+ 				/* Update priority to highest value available */
+ 				if (new_priority < walsndloc->sync_standby_priority)
+ 					new_priority = walsndloc->sync_standby_priority;
+ 			}
+ 			priority = new_priority;
+ 		}
+ 		else
+ 		{
+ 			volatile WalSnd *walsndloc = &WalSndCtl->walsnds[i];
+ 			sync_nodes[*num_sync] = i;
+ 			(*num_sync)++;
+ 
+ 			/* Update priority to highest value available */
+ 			if (priority < walsndloc->sync_standby_priority)
+ 				priority = walsndloc->sync_standby_priority;
+ 		}
+ 	}
+ 
+ 	return sync_nodes;
+ }
+ 
  /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-standbys-release-waiter.
   *
   * Other policies are possible, which would change what we do here and what
   * perhaps also which information we store as well.
***************
*** 368,378 **** void
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	volatile WalSnd *syncWalSnd = NULL;
  	int			numwrite = 0;
  	int			numflush = 0;
! 	int			priority = 0;
  	int			i;
  
  	/*
  	 * If this WALSender is serving a standby that is not on the list of
--- 496,509 ----
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	int		   *sync_standbys;
  	int			numwrite = 0;
  	int			numflush = 0;
! 	int			num_sync = 0;
  	int			i;
+ 	bool		found = false;
+ 	XLogRecPtr	min_write_pos;
+ 	XLogRecPtr	min_flush_pos;
  
  	/*
  	 * If this WALSender is serving a standby that is not on the list of
***************
*** 388,454 **** SyncRepReleaseWaiters(void)
  	/*
  	 * We're a potential sync standby. Release waiters if we are the highest
  	 * priority standby. If there are multiple standbys with same priorities
! 	 * then we use the first mentioned standby. If you change this, also
! 	 * change pg_stat_get_wal_senders().
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  
! 	for (i = 0; i < max_wal_senders; i++)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
! 
! 		if (walsnd->pid != 0 &&
! 			walsnd->state == WALSNDSTATE_STREAMING &&
! 			walsnd->sync_standby_priority > 0 &&
! 			(priority == 0 ||
! 			 priority > walsnd->sync_standby_priority) &&
! 			!XLogRecPtrIsInvalid(walsnd->flush))
  		{
! 			priority = walsnd->sync_standby_priority;
! 			syncWalSnd = walsnd;
  		}
  	}
  
  	/*
! 	 * We should have found ourselves at least.
  	 */
! 	Assert(syncWalSnd);
  
  	/*
! 	 * If we aren't managing the highest priority standby then just leave.
  	 */
! 	if (syncWalSnd != MyWalSnd)
  	{
  		LWLockRelease(SyncRepLock);
! 		announce_next_takeover = true;
  		return;
  	}
  
  	/*
  	 * Set the lsn first so that when we wake backends they will release up to
! 	 * this location.
  	 */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
  
  	LWLockRelease(SyncRepLock);
  
  	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
! 	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
  
  	/*
  	 * If we are managing the highest priority standby, though we weren't
! 	 * prior to this, then announce we are now the sync standby.
  	 */
  	if (announce_next_takeover)
  	{
--- 519,614 ----
  	/*
  	 * We're a potential sync standby. Release waiters if we are the highest
  	 * priority standby. If there are multiple standbys with same priorities
! 	 * then we use the first mentioned standbys.
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 	sync_standbys = SyncRepGetSynchronousNodes(&num_sync);
  
! 	/* We should have found ourselves at least */
! 	Assert(num_sync > 0);
! 
! 	/*
! 	 * If we aren't managing one of the standbys with highest priority
! 	 * then just leave.
! 	 */
! 	for (i = 0; i < num_sync; i++)
  	{
! 		volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
! 		if (walsndloc == MyWalSnd)
  		{
! 			found = true;
! 			break;
  		}
  	}
  
  	/*
! 	 * We are definitely not one of the chosen... But we could by
! 	 * taking the next takeover.
  	 */
! 	if (!found)
! 	{
! 		LWLockRelease(SyncRepLock);
! 		pfree(sync_standbys);
! 		announce_next_takeover = true;
! 		return;
! 	}
  
  	/*
! 	 * Even if we are one of the chosen standbys, leave if there
! 	 * are less synchronous standbys in waiting state than what is
! 	 * expected by the user.
  	 */
! 	if (num_sync < synchronous_standby_num &&
! 		synchronous_standby_num != -1)
  	{
  		LWLockRelease(SyncRepLock);
! 		pfree(sync_standbys);
  		return;
  	}
  
  	/*
  	 * Set the lsn first so that when we wake backends they will release up to
! 	 * this location, of course only if all the standbys found as synchronous
! 	 * have already reached that point, so first find what are the oldest
! 	 * write and flush positions of all the standbys considered in sync...
  	 */
! 	min_write_pos = MyWalSnd->write;
! 	min_flush_pos = MyWalSnd->flush;
! 	for (i = 0; i < num_sync; i++)
! 	{
! 		volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]];
! 
! 		SpinLockAcquire(&walsndloc->mutex);
! 		if (min_write_pos > walsndloc->write)
! 			min_write_pos = walsndloc->write;
! 		if (min_flush_pos > walsndloc->flush)
! 			min_flush_pos = walsndloc->flush;
! 		SpinLockRelease(&walsndloc->mutex);
! 	}
! 
! 	/* ... And now update if necessary */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
  
  	LWLockRelease(SyncRepLock);
  
  	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! 		 numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32),
! 		 (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE],
! 		 numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32),
! 		 (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]);
  
  	/*
  	 * If we are managing the highest priority standby, though we weren't
! 	 * prior to this, then announce we are now a sync standby.
  	 */
  	if (announce_next_takeover)
  	{
***************
*** 457,462 **** SyncRepReleaseWaiters(void)
--- 617,625 ----
  				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
  						application_name, MyWalSnd->sync_standby_priority)));
  	}
+ 
+ 	/* Clean up */
+ 	pfree(sync_standbys);
  }
  
  /*
***************
*** 483,488 **** SyncRepGetStandbyPriority(void)
--- 646,655 ----
  	if (am_cascading_walsender)
  		return 0;
  
+ 	/* If no synchronous nodes allowed, no cake for this WAL sender */
+ 	if (synchronous_standby_num == 0)
+ 		return 0;
+ 
  	/* Need a modifiable copy of string */
  	rawstring = pstrdup(SyncRepStandbyNames);
  
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2735,2742 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
  	int		   *sync_priority;
! 	int			priority = 0;
! 	int			sync_standby = -1;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
--- 2735,2742 ----
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
  	int		   *sync_priority;
! 	int		   *sync_standbys;
! 	int			num_sync = 0;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
***************
*** 2767,2802 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	/*
  	 * Get the priorities of sync standbys all in one go, to minimise lock
  	 * acquisitions and to allow us to evaluate who is the current sync
! 	 * standby. This code must match the code in SyncRepReleaseWaiters().
  	 */
  	sync_priority = palloc(sizeof(int) * max_wal_senders);
  	LWLockAcquire(SyncRepLock, LW_SHARED);
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
  
! 		if (walsnd->pid != 0)
! 		{
! 			/*
! 			 * Treat a standby such as a pg_basebackup background process
! 			 * which always returns an invalid flush location, as an
! 			 * asynchronous standby.
! 			 */
! 			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
! 				0 : walsnd->sync_standby_priority;
! 
! 			if (walsnd->state == WALSNDSTATE_STREAMING &&
! 				walsnd->sync_standby_priority > 0 &&
! 				(priority == 0 ||
! 				 priority > walsnd->sync_standby_priority) &&
! 				!XLogRecPtrIsInvalid(walsnd->flush))
! 			{
! 				priority = walsnd->sync_standby_priority;
! 				sync_standby = i;
! 			}
! 		}
  	}
  	LWLockRelease(SyncRepLock);
  
  	for (i = 0; i < max_wal_senders; i++)
--- 2767,2789 ----
  	/*
  	 * Get the priorities of sync standbys all in one go, to minimise lock
  	 * acquisitions and to allow us to evaluate who is the current sync
! 	 * standby.
  	 */
  	sync_priority = palloc(sizeof(int) * max_wal_senders);
  	LWLockAcquire(SyncRepLock, LW_SHARED);
+ 
+ 	/* Get first the priorities on each standby as long as we hold a lock */
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
  
! 		sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
! 			0 : walsnd->sync_standby_priority;
  	}
+ 
+ 	/* Obtain list of synchronous standbys */
+ 	sync_standbys = SyncRepGetSynchronousNodes(&num_sync);
  	LWLockRelease(SyncRepLock);
  
  	for (i = 0; i < max_wal_senders; i++)
***************
*** 2858,2872 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  			 */
  			if (sync_priority[i] == 0)
  				values[7] = CStringGetTextDatum("async");
- 			else if (i == sync_standby)
- 				values[7] = CStringGetTextDatum("sync");
  			else
! 				values[7] = CStringGetTextDatum("potential");
  		}
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
  	pfree(sync_priority);
  
  	/* clean up and return the tuplestore */
  	tuplestore_donestoring(tupstore);
--- 2845,2877 ----
  			 */
  			if (sync_priority[i] == 0)
  				values[7] = CStringGetTextDatum("async");
  			else
! 			{
! 				int j;
! 				bool found = false;
! 
! 				for (j = 0; j < num_sync; j++)
! 				{
! 					/* Found that this node is one in sync */
! 					if (i == sync_standbys[j])
! 					{
! 						values[7] = CStringGetTextDatum("sync");
! 						found = true;
! 						break;
! 					}
! 				}
! 				if (!found)
! 					values[7] = CStringGetTextDatum("potential");
! 			}
  		}
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
+ 
+ 	/* Cleanup */
  	pfree(sync_priority);
+ 	if (sync_standbys)
+ 		pfree(sync_standbys);
  
  	/* clean up and return the tuplestore */
  	tuplestore_donestoring(tupstore);
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2548,2553 **** static struct config_int ConfigureNamesInt[] =
--- 2548,2563 ----
  		NULL, NULL, NULL
  	},
  
+ 	{
+ 		{"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER,
+ 			gettext_noop("Number of potential synchronous standbys."),
+ 			NULL
+ 		},
+ 		&synchronous_standby_num,
+ 		-1, -1, INT_MAX,
+ 		NULL, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 235,240 ****
--- 235,241 ----
  #synchronous_standby_names = ''	# standby servers that provide sync rep
  				# comma-separated list of application_name
  				# from standby(s); '*' = all
+ #synchronous_standby_num = -1	# number of standbys servers using sync rep
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
  
  # - Standby Servers -
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 33,38 ****
--- 33,39 ----
  
  /* user-settable parameters for synchronous replication */
  extern char *SyncRepStandbyNames;
+ extern int	synchronous_standby_num;
  
  /* called by user backend */
  extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
***************
*** 49,54 **** extern void SyncRepUpdateSyncStandbysDefined(void);
--- 50,56 ----
  
  /* called by various procs */
  extern int	SyncRepWakeQueue(bool all, int mode);
+ extern int *SyncRepGetSynchronousNodes(int *num_sync);
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  extern void assign_synchronous_commit(int newval, void *extra);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to