On Thu, Mar 10, 2016 at 7:21 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Fri, Mar 4, 2016 at 3:40 AM, Masahiko Sawada <sawada.m...@gmail.com> wrote:
>> On Thu, Mar 3, 2016 at 11:30 PM, Masahiko Sawada <sawada.m...@gmail.com> 
>> wrote:
>>> Hi,
>>>
>>> Thank you so much for reviewing this patch!
>>>
>>> All review comments regarding document and comment are fixed.
>>> Attached latest v14 patch.
>>>
>>>> This accepts 'abc^Id' as a name, which is wrong behavior (but
>>>> such appliction names are not allowed anyway. If you assume so,
>>>> I'd like to see a comment for that.).
>>>
>>> 'abc^Id' is accepted as application_name, no?
>>> postgres(1)=# set application_name to 'abc^Id';
>>> SET
>>> postgres(1)=# show application_name ;
>>>  application_name
>>> ------------------
>>>  abc^Id
>>> (1 row)
>>>
>>>> addlit_xd_string(char *ytext) and addlitchar_xd_string(unsigned
>>>> char ychar) requires differnt character types. Is there any reason
>>>> for that?
>>>
>>> Because addlit_xd_string() is for adding string(char *) to xd_string,
>>> OTOH addlit_xd_char() is for adding just one character to xd_string.
>>>
>>>> I personally don't like addlit*string() things for such simple
>>>> syntax but itself is acceptble enough for me. However it uses
>>>> StringInfo to hold double-quoted names, which pallocs 1024 bytes
>>>> of memory chunk for every double-quoted name. The chunks are
>>>> finally stacked up left uncollected until the current
>>>> memorycontext is deleted or reset (It is deleted just after
>>>> finishing config file processing). Addition to that, setting
>>>> s_s_names runs the parser twice. It seems to me too greedy and
>>>> seems that static char [NAMEDATALEN] is enough using the v12 way
>>>> without palloc/repalloc.
>>>
>>> I though that length of group name could be more than NAMEDATALEN, so
>>> I use StringInfo.
>>> Is it not necessary?
>>>
>>>> I found that the name SyncGroupName.wait_num is not
>>>> instinctive. How about sync_num, sync_member_num or
>>>> sync_standby_num? If the last is preferable, .members also should
>>>> be .standbys .
>>>
>>> Thanks, sync_num is preferable to me.
>>>
>>> ===
>>>> I am quite uncomfortable with the existence of
>>>> WanSnd.sync_standby_priority. It represented the pirority in the
>>>> old linear s_s_names format but nested groups or even
>>>> single-level quarum list obviously doesn't fit it. Can we get rid
>>>> of sync_standby_priority, even though we realize atmost
>>>> n-priority for now?
>>>
>>> We could get rid of sync_standby_priority.
>>> But if so, we will not be able to see the next sync standby in
>>> pg_stat_replication system view.
>>> Regarding each node priority, I was thinking that standbys in quorum
>>> list have same priority, and in nested group each standbys are given
>>> the priority starting from 1.
>>>
>>> ===
>>>> The function SyncRepGetSyncedLsnsUsingPriority doesn't seem to
>>>> have specific code for every prioritizing method (which are
>>>> priority, quorum, nested and so). Is there any reson to use it as
>>>> a callback of SyncGroupNode?
>>>
>>> The reason why the current code is so is that current code is for only
>>> priority method supporting.
>>> At first version of this feature, I'd like to implement it more simple.
>>>
>>> Aside from this, of course I'm planning to have specific code for nested 
>>> design.
>>> - The group can have some name nodes or group nodes.
>>> - The group can use either 2 types of method: priority or quorum.
>>> - The group has SyncRepGetSyncedLsnFn() and SyncRepGetStandbysFn()
>>>   - SyncRepGetSyncedLsnsFn() function recursively determine synced LSN
>>> at that moment using group's method.
>>>   - SyncRepGetStandbysFn() function returns standbys of its group,
>>> which are considered as sync using group's method.
>>>
>>> For example, s_s_name  = '3(a, b, 2[c,d]::group1)', SyncRepStandbys
>>> memory structure will be,
>>>
>>> "main(quorum)" --- "a"
>>>                         |
>>>                         -- "b"
>>>                         |
>>>                         -- "group1(priority)" --- "c"
>>>                                                      |
>>>                                                      -- "d"
>>>
>>> When determine synced LSNs, we need to consider group1's LSN using by
>>> priority method at first, and then we can determine main's LSN using
>>> by quorum method with "a" LSNs, "b" LSNs and "group1" LSNs.
>>> So SyncRepGetSyncedLsnsUsingPriority() function would be,
>>>
>>> bool
>>> SyncRepGetSyncedLsnsUsingPriority(*group, *write_lsn, *flush_lsn)
>>> {
>>>     sync_num = group->SynRepGetSyncstandbysFn(group, sync_list);
>>>
>>>     if (sync_num < group->sync_num)
>>>         return false;
>>>
>>>     for (each member of sync_list)
>>>     {
>>>         if (member->type == group node)
>>>             call SyncRepGetSyncedLsnsFn(member, w, f) and store w and
>>> f into lsn_list.
>>>         else
>>>             Store name node LSNs into lsn_list.
>>>     }
>>>
>>>     Determine synced LSNs of this group using lsn_list and priority method.
>>>     Store synced LSNs into write_lsn and flush_lsn.
>>>     return true;
>>> }
>>>
>>>> SyncRepClearStandbyGroupList is defined in syncrep.c but the
>>>> other related functions are defined in syncgroup_gram.y. It would
>>>> be better to place them together.
>>>
>>> SyncRepClearStandbyGroupList() is used by
>>> check_synchronous_standby_names(), so I put this function syncrep.c.
>>>
>>>> SyncRepStandbys are to be in multilevel and the struct is
>>>> naturally allowed to be so but SyncRepClearStandbyGroupList
>>>> assumes it in single level.
>>>
>>> Because I think that we don't need to implement to fully support
>>> nested style at first version.
>>> We have to carefully design this feature while considering
>>> expandability, but overkill implementation could be cause of crash.
>>> Consider remaining time for 9.6, I feel we could implement quorum
>>> method at best.
>>>
>>>> This is a comment from the aspect of abstractness of objects.
>>>> The callers of SyncRepGetSyncStandbysUsingPriority() need to care
>>>> the inside of SyncGroupNode but what the function should just
>>>> return seems to be the list of wansnds element. Element number is
>>>> useless when the SyncGroupNode nests.
>>>> > int
>>>> > SyncRepGetSyncStandbysUsingPriority(SyncGroupNode *group, volatile 
>>>> > WalSnd **sync_list)
>>>> This might need to expose 'volatile WalSnd*' (only pointer type)
>>>> outside of walsender.
>>>> Or it should return the list of index number of
>>>> *WalSndCtl->walsnds*.
>>>
>>> SyncRepGetSyncStandbysUsingPriority() already returns the list of
>>> index number of "WalSndCtl->walsnd" as sync_list, no?
>>> As I mentioned above, SyncRepGetSyncStandbysFn() doesn't need care the
>>> inside of SyncGroupNode in my design.
>>> Selecting sync nodes from its group doesn't depend on the type of node.
>>> What SyncRepGetSyncStandbyFn() should do is to select sync node from
>>> *its* group.
>>>
>>
>> Previous patch has bug around GUC parameter handling.
>> Attached updated version.
>
> Thanks for updating the patch!
>
> Now I'm fixing some problems (e.g., current patch doesn't work
> with EXEC_BACKEND environment) and revising the patch.

Sorry for the delay... Here is the revised version of the patch.
Please review and test this version!
BTW, I've not revised the documentation and regression test yet.
I will do that during the review and test of the patch.

Regards,

-- 
Fujii Masao
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2902,2921 **** include_dir 'conf.d'
        </term>
        <listitem>
         <para>
!         Specifies a comma-separated list of standby names that can support
!         <firstterm>synchronous replication</>, as described in
!         <xref linkend="synchronous-replication">.
!         At any one time there will be at most one active synchronous standby;
!         transactions waiting for commit will be allowed to proceed after
!         this standby server confirms receipt of their data.
!         The synchronous standby will be the first standby named in this list
!         that is both currently connected and streaming data in real-time
!         (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
!         Other standby servers appearing later in this list represent potential
!         synchronous standbys.
!         If the current synchronous standby disconnects for whatever reason,
          it will be replaced immediately with the next-highest-priority standby.
          Specifying more than one standby name can allow very high availability.
         </para>
--- 2902,2919 ----
        </term>
        <listitem>
         <para>
!         Specifies the standby names that can support <firstterm>synchronous replication</>
!         using either of two syntaxes: a comma-separated list, or a more flexible syntax
!         described in <xref linkend="synchronous-replication">.
!         Transactions waiting for commit will be allowed to proceed after a
!         configurable subset of standby servers confirms receipt of their data.
!         For the simple comma-separated list syntax, it is one server.
!         The synchronous standbys will be those named in this parameter that are both
!         currently connected and streaming data in real-time (as shown by a state
!         of <literal>streaming</> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
!         If the any of the current synchronous standbys disconnects for whatever reason,
          it will be replaced immediately with the next-highest-priority standby.
          Specifying more than one standby name can allow very high availability.
         </para>
***************
*** 2923,2931 **** include_dir 'conf.d'
          The name of a standby server for this purpose is the
          <varname>application_name</> setting of the standby, as set in the
          <varname>primary_conninfo</> of the standby's WAL receiver.  There is
!         no mechanism to enforce uniqueness. In case of duplicates one of the
!         matching standbys will be chosen to be the synchronous standby, though
!         exactly which one is indeterminate.
          The special entry <literal>*</> matches any
          <varname>application_name</>, including the default application name
          of <literal>walreceiver</>.
--- 2921,2930 ----
          The name of a standby server for this purpose is the
          <varname>application_name</> setting of the standby, as set in the
          <varname>primary_conninfo</> of the standby's WAL receiver.  There is
!         no mechanism to enforce uniqueness. For each specified standby name,
!         only the specified count of standbys will be chosen to be synchronous
!         standbys, though exactly which ones is indeterminate.  The rest will
!         represent potential synchronous standbys.
          The special entry <literal>*</> matches any
          <varname>application_name</>, including the default application name
          of <literal>walreceiver</>.
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1027,1050 **** primary_slot_name = 'node_a_slot'
  
     <para>
      Synchronous replication offers the ability to confirm that all changes
!     made by a transaction have been transferred to one synchronous standby
!     server. This extends the standard level of durability
      offered by a transaction commit. This level of protection is referred
!     to as 2-safe replication in computer science theory.
     </para>
  
     <para>
      When requesting synchronous replication, each commit of a
      write transaction will wait until confirmation is
      received that the commit has been written to the transaction log on disk
!     of both the primary and standby server. The only possibility that data
!     can be lost is if both the primary and the standby suffer crashes at the
      same time. This can provide a much higher level of durability, though only
!     if the sysadmin is cautious about the placement and management of the two
      servers.  Waiting for confirmation increases the user's confidence that the
      changes will not be lost in the event of server crashes but it also
      necessarily increases the response time for the requesting transaction.
!     The minimum wait time is the roundtrip time between primary to standby.
     </para>
  
     <para>
--- 1027,1051 ----
  
     <para>
      Synchronous replication offers the ability to confirm that all changes
!     made by a transaction have been transferred to one or more synchronous standby
!     server. This extends that standard level of durability
      offered by a transaction commit. This level of protection is referred
!     to as group-safe replication in computer science theory.
     </para>
  
     <para>
      When requesting synchronous replication, each commit of a
      write transaction will wait until confirmation is
      received that the commit has been written to the transaction log on disk
!     of both the primary and standby servers. The only possibility that data
!     can be lost is if both the primary and the standbys suffer crash at the
      same time. This can provide a much higher level of durability, though only
!     if the sysadmin is cautious about the placement and management of the these
      servers.  Waiting for confirmation increases the user's confidence that the
      changes will not be lost in the event of server crashes but it also
      necessarily increases the response time for the requesting transaction.
!     The minimum wait time is the roundtrip time between the primary and the
!     slowest synchronous standby.
     </para>
  
     <para>
***************
*** 1115,1120 **** primary_slot_name = 'node_a_slot'
--- 1116,1157 ----
  
     </sect3>
  
+    <sect3 id="synchronous-replication-multiple-synchronization">
+     <title>Multiple Synchronous Replication</title>
+ 
+    <para>
+     Setting up synchronous standby at multiple locations bring us high availability.
+     It ensures that modified data will be replicated to multiple synchronous standbys.
+    </para>
+ 
+    <para>
+     Multiple synchronous replication is set up by setting <xref linkend="guc-synchronous-standby-names">
+     using the following syntax.
+    </para>
+     
+     <synopsis>
+         synchronous_standby_names = '<replaceable class="PARAMETER">N</replaceable> [ <replaceable class="PARAMETER">node</replaceable> [, ...] ]'
+     </synopsis>
+ 
+    <para>
+     This syntax allows us to define a synchronous group that will wait for at
+     least N standbys, and a comma-separated list of group members.  The special
+     value <literal>*</> matches any standby. The number N must not be greater
+     than the number of members listed in the group, unless <literal>*</> is
+     used.  Priority is given to servers in the order that they appear in the list.
+     The first named server has the highest priority.
+    </para>
+ 
+    <note>
+     <para>
+     All ASCII characters except for special characters(',', '&quot', '[', ']', ' ') are
+     allowed in unquoted standby names.  To use these special characters, the standby
+     name should be enclosed in double quotes.
+     </para>
+    </note>
+ 
+    </sect3>
+ 
     <sect3 id="synchronous-replication-performance">
      <title>Planning for Performance</title>
  
*** a/src/backend/Makefile
--- b/src/backend/Makefile
***************
*** 203,209 **** distprep:
  	$(MAKE) -C parser	gram.c gram.h scan.c
  	$(MAKE) -C bootstrap	bootparse.c bootscanner.c
  	$(MAKE) -C catalog	schemapg.h postgres.bki postgres.description postgres.shdescription
! 	$(MAKE) -C replication	repl_gram.c repl_scanner.c
  	$(MAKE) -C storage/lmgr	lwlocknames.h
  	$(MAKE) -C utils	fmgrtab.c fmgroids.h errcodes.h
  	$(MAKE) -C utils/misc	guc-file.c
--- 203,209 ----
  	$(MAKE) -C parser	gram.c gram.h scan.c
  	$(MAKE) -C bootstrap	bootparse.c bootscanner.c
  	$(MAKE) -C catalog	schemapg.h postgres.bki postgres.description postgres.shdescription
! 	$(MAKE) -C replication	repl_gram.c repl_scanner.c syncrep_gram.c syncrep_scanner.c
  	$(MAKE) -C storage/lmgr	lwlocknames.h
  	$(MAKE) -C utils	fmgrtab.c fmgroids.h errcodes.h
  	$(MAKE) -C utils/misc	guc-file.c
***************
*** 320,325 **** maintainer-clean: distclean
--- 320,327 ----
  	      catalog/postgres.shdescription \
  	      replication/repl_gram.c \
  	      replication/repl_scanner.c \
+ 	      replication/syncrep_gram.c \
+ 	      replication/syncrep_scanner.c \
  	      storage/lmgr/lwlocknames.c \
  	      storage/lmgr/lwlocknames.h \
  	      utils/fmgroids.h \
*** a/src/backend/replication/.gitignore
--- b/src/backend/replication/.gitignore
***************
*** 1,2 ****
--- 1,4 ----
  /repl_gram.c
  /repl_scanner.c
+ /syncrep_gram.c
+ /syncrep_scanner.c
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 15,21 **** include $(top_builddir)/src/Makefile.global
  override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
  
  OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! 	repl_gram.o slot.o slotfuncs.o syncrep.o
  
  SUBDIRS = logical
  
--- 15,21 ----
  override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
  
  OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! 	repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o
  
  SUBDIRS = logical
  
***************
*** 24,28 **** include $(top_srcdir)/src/backend/common.mk
  # repl_scanner is compiled as part of repl_gram
  repl_gram.o: repl_scanner.c
  
! # repl_gram.c and repl_scanner.c are in the distribution tarball, so
! # they are not cleaned here.
--- 24,33 ----
  # repl_scanner is compiled as part of repl_gram
  repl_gram.o: repl_scanner.c
  
! # syncrep_scanner is complied as part of syncrep_gram
! syncrep_gram.o: syncrep_scanner.c
! syncrep_scanner.c: FLEXFLAGS = -CF -p
! syncrep_scanner.c: FLEX_NO_BACKUP=yes
! 
! # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
! # are in the distribution tarball, so they are not cleaned here.
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 29,39 ****
   * single ordered queue of waiting backends, so that we can avoid
   * searching the through all waiters each time we receive a reply.
   *
!  * In 9.1 we support only a single synchronous standby, chosen from a
!  * priority list of synchronous_standby_names. Before it can become the
!  * synchronous standby it must have caught up with the primary; that may
!  * take some time. Once caught up, the current highest priority standby
!  * will release waiters from the queue.
   *
   * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
   *
--- 29,40 ----
   * single ordered queue of waiting backends, so that we can avoid
   * searching the through all waiters each time we receive a reply.
   *
!  * In 9.6 we support multiple synchronous standbys, chosen from a
!  * priority list of synchronous_standby_names. Before they can become the
!  * synchronous standbys they must have caught up with the primary; that may
!  * take some time. Once caught up, the current higher priority standbys
!  * which are considered as synchronous at that moment will release
!  * waiters from the queue.
   *
   * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
   *
***************
*** 65,76 **** char	   *SyncRepStandbyNames;
--- 66,80 ----
  
  static bool announce_next_takeover = true;
  
+ SyncRepConfigData *SyncRepConfig;
  static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
  
  static void SyncRepQueueInsert(int mode);
  static void SyncRepCancelWait(void);
  static int	SyncRepWakeQueue(bool all, int mode);
  
+ static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ 									   XLogRecPtr *flushPtr, bool *am_sync);
  static int	SyncRepGetStandbyPriority(void);
  
  #ifdef USE_ASSERT_CHECKING
***************
*** 332,337 **** SyncRepInitConfig(void)
--- 336,345 ----
  {
  	int			priority;
  
+ 	/* Update the config data of synchronous replication */
+ 	SyncRepFreeConfig(SyncRepConfig);
+ 	SyncRepUpdateConfig();
+ 
  	/*
  	 * Determine if we are a potential sync standby and remember the result
  	 * for handling replies from standby.
***************
*** 349,410 **** SyncRepInitConfig(void)
  }
  
  /*
-  * Find the WAL sender servicing the synchronous standby with the lowest
-  * priority value, or NULL if no synchronous standby is connected. If there
-  * are multiple standbys with the same lowest priority value, the first one
-  * found is selected. The caller must hold SyncRepLock.
-  */
- WalSnd *
- SyncRepGetSynchronousStandby(void)
- {
- 	WalSnd	   *result = NULL;
- 	int			result_priority = 0;
- 	int			i;
- 
- 	for (i = 0; i < max_wal_senders; i++)
- 	{
- 		/* Use volatile pointer to prevent code rearrangement */
- 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- 		int			this_priority;
- 
- 		/* Must be active */
- 		if (walsnd->pid == 0)
- 			continue;
- 
- 		/* Must be streaming */
- 		if (walsnd->state != WALSNDSTATE_STREAMING)
- 			continue;
- 
- 		/* Must be synchronous */
- 		this_priority = walsnd->sync_standby_priority;
- 		if (this_priority == 0)
- 			continue;
- 
- 		/* Must have a lower priority value than any previous ones */
- 		if (result != NULL && result_priority <= this_priority)
- 			continue;
- 
- 		/* Must have a valid flush position */
- 		if (XLogRecPtrIsInvalid(walsnd->flush))
- 			continue;
- 
- 		result = (WalSnd *) walsnd;
- 		result_priority = this_priority;
- 
- 		/*
- 		 * If priority is equal to 1, there cannot be any other WAL senders
- 		 * with a lower priority, so we're done.
- 		 */
- 		if (this_priority == 1)
- 			return result;
- 	}
- 
- 	return result;
- }
- 
- /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-standby-releases-waiter.
   *
   * Other policies are possible, which would change what we do here and
   * perhaps also which information we store as well.
--- 357,364 ----
  }
  
  /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-sync-standby-releases-waiter.
   *
   * Other policies are possible, which would change what we do here and
   * perhaps also which information we store as well.
***************
*** 413,419 **** void
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	WalSnd	   *syncWalSnd;
  	int			numwrite = 0;
  	int			numflush = 0;
  
--- 367,376 ----
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	XLogRecPtr	writePtr;
! 	XLogRecPtr	flushPtr;
! 	bool		got_oldest;
! 	bool		am_sync;
  	int			numwrite = 0;
  	int			numflush = 0;
  
***************
*** 429,450 **** SyncRepReleaseWaiters(void)
  		return;
  
  	/*
! 	 * We're a potential sync standby. Release waiters if we are the highest
! 	 * priority standby.
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- 	syncWalSnd = SyncRepGetSynchronousStandby();
  
! 	/* We should have found ourselves at least */
! 	Assert(syncWalSnd != NULL);
  
  	/*
! 	 * If we aren't managing the highest priority standby then just leave.
  	 */
! 	if (syncWalSnd != MyWalSnd)
  	{
  		LWLockRelease(SyncRepLock);
! 		announce_next_takeover = true;
  		return;
  	}
  
--- 386,422 ----
  		return;
  
  	/*
! 	 * We're a potential sync standby. Release waiters if there are
! 	 * enough sync standbys and we are considered as sync.
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  
! 	/*
! 	 * Check whether we are a sync standby or not, and calculate
! 	 * the oldest positions among all sync standbys.
! 	 */
! 	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, &am_sync);
! 
! 	/*
! 	 * If we are managing the sync standby, though we weren't
! 	 * prior to this, then announce we are now the sync standby.
! 	 */
! 	if (announce_next_takeover && am_sync)
! 	{
! 		announce_next_takeover = false;
! 		ereport(LOG,
! 				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
! 						application_name, MyWalSnd->sync_standby_priority)));
! 	}
  
  	/*
! 	 * If the number of sync standbys is less than requested or we aren't
! 	 * managing the sync standby then just leave.
  	 */
! 	if (!got_oldest || !am_sync)
  	{
  		LWLockRelease(SyncRepLock);
! 		announce_next_takeover = !am_sync;
  		return;
  	}
  
***************
*** 452,485 **** SyncRepReleaseWaiters(void)
  	 * Set the lsn first so that when we wake backends they will release up to
  	 * this location.
  	 */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
  
  	LWLockRelease(SyncRepLock);
  
  	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
! 	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
  
  	/*
! 	 * If we are managing the highest priority standby, though we weren't
! 	 * prior to this, then announce we are now the sync standby.
  	 */
! 	if (announce_next_takeover)
  	{
! 		announce_next_takeover = false;
! 		ereport(LOG,
! 				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
! 						application_name, MyWalSnd->sync_standby_priority)));
  	}
  }
  
  /*
--- 424,643 ----
  	 * Set the lsn first so that when we wake backends they will release up to
  	 * this location.
  	 */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
  
  	LWLockRelease(SyncRepLock);
  
  	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
! 		 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
! 		 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr);
! }
! 
! /*
!  * Calculate the oldest Write and Flush positions among sync standbys.
!  *
!  * Return false if the number of sync standbys is less than
!  * synchronous_standby_names specifies. Otherwise return true and
!  * store the oldest positions into *writePtr and *flushPtr.
!  *
!  * On return, *am_sync is set to true if this walsender is connecting to
!  * sync standby. Otherwise it's set to false.
!  */
! static bool
! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
! 						   bool *am_sync)
! {
! 	List		*sync_standbys;
! 	ListCell	*cell;
! 
! 	*writePtr = InvalidXLogRecPtr;
! 	*flushPtr = InvalidXLogRecPtr;
! 	*am_sync = false;
! 
! 	/* Get standbys that are considered as synchronous at this moment */
! 	sync_standbys = SyncRepGetSyncStandbys();
! 
! 	/* Quick exit if there are not enough synchronous standbys */
! 	if (list_length(sync_standbys) < SyncRepConfig->num_sync)
! 	{
! 		*am_sync = list_member_int(sync_standbys, MyWalSnd->slotno);
! 		list_free(sync_standbys);
! 		return false;
! 	}
  
  	/*
! 	 * Scan through all sync standbys and calculate the oldest
! 	 * Write and Flush positions.
  	 */
! 	foreach (cell, sync_standbys)
  	{
! 		WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
! 		XLogRecPtr	write;
! 		XLogRecPtr	flush;
! 
! 		SpinLockAcquire(&walsnd->mutex);
! 		write = walsnd->write;
! 		flush = walsnd->flush;
! 		SpinLockRelease(&walsnd->mutex);
! 
! 		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
! 			*writePtr = write;
! 		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
! 			*flushPtr = flush;
! 		if (walsnd == MyWalSnd)
! 			*am_sync = true;
  	}
+ 
+ 	list_free(sync_standbys);
+ 	return true;
+ }
+ 
+ /*
+  * Return the list of sync standbys, or NIL if no sync standby is connected.
+  *
+  * If there are multiple standbys with the same priority,
+  * the first one found is considered as higher priority.
+  * The caller must hold SyncRepLock.
+  */
+ List *
+ SyncRepGetSyncStandbys(void)
+ {
+ 	List	*result = NIL;
+ 	List	*pending = NIL;
+ 	int	lowest_priority;
+ 	int	next_highest_priority;
+ 	int	this_priority;
+ 	int	priority;
+ 	int	i;
+ 	WalSnd	*walsnd;
+ 
+ 	/* Quick exit if sync replication is not requested */
+ 	if (SyncRepConfig == NULL)
+ 		return NIL;
+ 
+ 	lowest_priority = list_length(SyncRepConfig->members);
+ 	next_highest_priority = lowest_priority;
+ 
+ 	/*
+ 	 * Find the sync standbys which have the highest priority (i.e, 1).
+ 	 * Also store all the other potential sync standbys into the pending list,
+ 	 * in order to scan it later and find other sync standbys from it quickly.
+ 	 */
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		/* Must be active */
+ 		if (walsnd->pid == 0)
+ 			continue;
+ 
+ 		/* Must be streaming */
+ 		if (walsnd->state != WALSNDSTATE_STREAMING)
+ 			continue;
+ 
+ 		/* Must be synchronous */
+ 		this_priority = walsnd->sync_standby_priority;
+ 		if (this_priority == 0)
+ 			continue;
+ 
+ 		/* Must have a valid flush position */
+ 		if (XLogRecPtrIsInvalid(walsnd->flush))
+ 			continue;
+ 
+ 		/*
+ 		 * If the priority is equal to 1, consider this standby as sync
+ 		 * and append it to the result. Otherwise append this standby
+ 		 * to the pending list to check if it's actually sync or not later.
+ 		 */
+ 		if (this_priority == 1)
+ 		{
+ 			result = lappend_int(result, i);
+ 			if (list_length(result) == SyncRepConfig->num_sync)
+ 			{
+ 				list_free(pending);
+ 				return result;		/* Exit if got enough sync standbys */
+ 			}
+ 		}
+ 		else
+ 		{
+ 			pending = lappend_int(pending, i);
+ 
+ 			/*
+ 			 * Track the highest priority among the standbys in the pending
+ 			 * list, in order to use it as the starting priority for later scan
+ 			 * of the list. This is useful to find quickly the sync standbys
+ 			 * from the pending list later because we can skip unnecessary
+ 			 * scans for the unused priorities.
+ 			 */
+ 			if (this_priority < next_highest_priority)
+ 				next_highest_priority = this_priority;
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Consider all pending standbys as sync if the number of them plus
+ 	 * already-found sync ones is lower than the configuration requests.
+ 	 */
+ 	if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
+ 		return list_concat(result, pending);
+ 
+ 	/*
+ 	 * Find the sync standbys from the pending list.
+ 	 */
+ 	priority = next_highest_priority;
+ 	while (priority <= lowest_priority)
+ 	{
+ 		ListCell	*cell;
+ 		ListCell	*prev = NULL;
+ 		ListCell	*next;
+ 
+ 		next_highest_priority = lowest_priority;
+ 
+ 		for (cell = list_head(pending); cell; cell = next)
+ 		{
+ 			i = lfirst_int(cell);
+ 			walsnd = &WalSndCtl->walsnds[i];
+ 
+ 			next = lnext(cell);
+ 
+ 			this_priority = walsnd->sync_standby_priority;
+ 			if (this_priority == priority)
+ 			{
+ 				result = lappend_int(result, i);
+ 				if (list_length(result) == SyncRepConfig->num_sync)
+ 				{
+ 					list_free(pending);
+ 					return result;		/* Exit if got enough sync standbys */
+ 				}
+ 
+ 				/*
+ 				 * Remove the entry for this sync standby from the list
+ 				 * to prevent us from looking at the same entry again.
+ 				 */
+ 				pending = list_delete_cell(pending, cell, prev);
+ 
+ 				continue;
+ 			}
+ 
+ 			if (this_priority < next_highest_priority)
+ 				next_highest_priority = this_priority;
+ 
+ 			prev = cell;
+ 		}
+ 
+ 		priority = next_highest_priority;
+ 	}
+ 
+ 	return result;
  }
  
  /*
***************
*** 493,500 **** SyncRepReleaseWaiters(void)
  static int
  SyncRepGetStandbyPriority(void)
  {
! 	char	   *rawstring;
! 	List	   *elemlist;
  	ListCell   *l;
  	int			priority = 0;
  	bool		found = false;
--- 651,657 ----
  static int
  SyncRepGetStandbyPriority(void)
  {
! 	List	   *members;
  	ListCell   *l;
  	int			priority = 0;
  	bool		found = false;
***************
*** 506,525 **** SyncRepGetStandbyPriority(void)
  	if (am_cascading_walsender)
  		return 0;
  
! 	/* Need a modifiable copy of string */
! 	rawstring = pstrdup(SyncRepStandbyNames);
! 
! 	/* Parse string into list of identifiers */
! 	if (!SplitIdentifierString(rawstring, ',', &elemlist))
! 	{
! 		/* syntax error in list */
! 		pfree(rawstring);
! 		list_free(elemlist);
! 		/* GUC machinery will have already complained - no need to do again */
  		return 0;
- 	}
  
! 	foreach(l, elemlist)
  	{
  		char	   *standby_name = (char *) lfirst(l);
  
--- 663,673 ----
  	if (am_cascading_walsender)
  		return 0;
  
! 	if (!SyncStandbysDefined())
  		return 0;
  
! 	members = SyncRepConfig->members;
! 	foreach(l, members)
  	{
  		char	   *standby_name = (char *) lfirst(l);
  
***************
*** 533,541 **** SyncRepGetStandbyPriority(void)
  		}
  	}
  
- 	pfree(rawstring);
- 	list_free(elemlist);
- 
  	return (found ? priority : 0);
  }
  
--- 681,686 ----
***************
*** 643,648 **** SyncRepUpdateSyncStandbysDefined(void)
--- 788,829 ----
  	}
  }
  
+ /*
+  * Parse synchronous_standby_names and update the config data
+  * of synchronous standbys.
+  */
+ void
+ SyncRepUpdateConfig(void)
+ {
+ 	int	parse_rc;
+ 
+ 	/*
+ 	 * check_synchronous_standby_names() verifies the setting value of
+ 	 * synchronous_standby_names before this function is called. So
+ 	 * syncrep_yyparse() must not cause an error here.
+ 	 */
+ 	syncrep_scanner_init(SyncRepStandbyNames);
+ 	parse_rc = syncrep_yyparse();
+ 	Assert(parse_rc == 0);
+ 	syncrep_scanner_finish();
+ 
+ 	SyncRepConfig = syncrep_parse_result;
+ 	syncrep_parse_result = NULL;
+ }
+ 
+ /*
+  * Free a previously-allocated config data of synchronous replication.
+  */
+ void
+ SyncRepFreeConfig(SyncRepConfigData *config)
+ {
+ 	if (!config)
+ 		return;
+ 
+ 	list_free(config->members);
+ 	pfree(config);
+ }
+ 
  #ifdef USE_ASSERT_CHECKING
  static bool
  SyncRepQueueIsOrderedByLSN(int mode)
***************
*** 687,718 **** SyncRepQueueIsOrderedByLSN(int mode)
  bool
  check_synchronous_standby_names(char **newval, void **extra, GucSource source)
  {
! 	char	   *rawstring;
! 	List	   *elemlist;
! 
! 	/* Need a modifiable copy of string */
! 	rawstring = pstrdup(*newval);
  
! 	/* Parse string into list of identifiers */
! 	if (!SplitIdentifierString(rawstring, ',', &elemlist))
  	{
! 		/* syntax error in list */
! 		GUC_check_errdetail("List syntax is invalid.");
! 		pfree(rawstring);
! 		list_free(elemlist);
! 		return false;
! 	}
  
! 	/*
! 	 * Any additional validation of standby names should go here.
! 	 *
! 	 * Don't attempt to set WALSender priority because this is executed by
! 	 * postmaster at startup, not WALSender, so the application_name is not
! 	 * yet correctly set.
! 	 */
  
! 	pfree(rawstring);
! 	list_free(elemlist);
  
  	return true;
  }
--- 868,896 ----
  bool
  check_synchronous_standby_names(char **newval, void **extra, GucSource source)
  {
! 	int	parse_rc;
  
! 	if (*newval != NULL && (*newval)[0] != '\0')
  	{
! 		syncrep_scanner_init(*newval);
! 		parse_rc = syncrep_yyparse();
! 		syncrep_scanner_finish();
  
! 		if (parse_rc != 0)
! 		{
! 			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
! 			GUC_check_errdetail("synchronous_standby_names parser returned %d",
! 								parse_rc);
! 			return false;
! 		}
  
! 		/*
! 		 * syncrep_yyparse sets the global syncrep_parse_result as side effect.
! 		 * But this function is required to just check, so frees it
! 		 * once parsing parameter.
! 		 */
! 		SyncRepFreeConfig(syncrep_parse_result);
! 	}
  
  	return true;
  }
*** /dev/null
--- b/src/backend/replication/syncrep_gram.y
***************
*** 0 ****
--- 1,86 ----
+ %{
+ /*-------------------------------------------------------------------------
+  *
+  * syncrep_gram.y				- Parser for synchronous_standby_names
+  *
+  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  src/backend/replication/syncrep_gram.y
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "postgres.h"
+ 
+ #include "replication/syncrep.h"
+ #include "utils/formatting.h"
+ 
+ /* Result of the parsing is returned here */
+ SyncRepConfigData	*syncrep_parse_result;
+ 
+ static SyncRepConfigData *create_syncrep_config(char *num_sync, List *members);
+ 
+ /*
+  * Bison doesn't allocate anything that needs to live across parser calls,
+  * so we can easily have it use palloc instead of malloc.  This prevents
+  * memory leaks if we error out during parsing.  Note this only works with
+  * bison >= 2.0.  However, in bison 1.875 the default is to use alloca()
+  * if possible, so there's not really much problem anyhow, at least if
+  * you're building with gcc.
+  */
+ #define YYMALLOC palloc
+ #define YYFREE   pfree
+ 
+ %}
+ 
+ %expect 0
+ %name-prefix="syncrep_yy"
+ 
+ %union
+ {
+ 	char	   *str;
+ 	List	   *list;
+ 	SyncRepConfigData  *config;
+ }
+ 
+ %token <str> NAME NUM
+ 
+ %type <config> result standby_config
+ %type <list> standby_list
+ %type <str> standby_name
+ 
+ %start result
+ 
+ %%
+ result:
+ 		standby_config				{ syncrep_parse_result = $1; }
+ ;
+ standby_config:
+ 		standby_list				{ $$ = create_syncrep_config("1", $1); }
+ 		| NUM '[' standby_list ']'		{ $$ = create_syncrep_config($1, $3); }
+ ;
+ standby_list:
+ 		standby_name				{ $$ = list_make1($1);}
+ 		| standby_list ',' standby_name		{ $$ = lappend($1, $3);}
+ ;
+ standby_name:
+ 		NAME					{ $$ = $1; }
+ 		| NUM					{ $$ = $1; }
+ ;
+ %%
+ 
+ static SyncRepConfigData *
+ create_syncrep_config(char *num_sync, List *members)
+ {
+ 	SyncRepConfigData *config =
+ 		(SyncRepConfigData *) palloc(sizeof(SyncRepConfigData));
+ 
+ 	config->num_sync = atoi(num_sync);
+ 	config->members = members;
+ 	return config;
+ }
+ 
+ #include "syncrep_scanner.c"
*** /dev/null
--- b/src/backend/replication/syncrep_scanner.l
***************
*** 0 ****
--- 1,119 ----
+ %{
+ /*-------------------------------------------------------------------------
+  *
+  * syncrep_scanner.l
+  *	  a lexical scanner for synchronous_standby_names
+  *
+  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  src/backend/replication/syncrep_scanner.l
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "miscadmin.h"
+ #include "lib/stringinfo.h"
+ 
+ /* Handles to the buffer that the lexer uses internally */
+ static YY_BUFFER_STATE scanbufhandle;
+ 
+ static StringInfoData xdbuf;
+ 
+ %}
+ 
+ %option 8bit
+ %option never-interactive
+ %option nounput
+ %option noinput
+ %option noyywrap
+ %option warn
+ %option prefix="syncrep_yy"
+ 
+ /*
+  * <xd> delimited identifiers (double-quoted identifiers)
+  */
+ %x xd
+ 
+ space		[ \t\n\r\f]
+ 
+ undquoted_start	[^ ,\[\]\"]
+ undquoted_cont		[^ ,\[\]]
+ undquoted_name    {undquoted_start}{undquoted_cont}*
+ dquoted_name		[^\"]+
+ 
+ /* Double-quoted string */
+ dquote		\"
+ xdstart		{dquote}
+ xddouble		{dquote}{dquote}
+ xdstop		{dquote}
+ xdinside		{dquoted_name}
+ 
+ %%
+ {space}+		{ /* ignore */ }
+ {xdstart}	{
+ 				initStringInfo(&xdbuf);
+ 				BEGIN(xd);
+ 		}
+ <xd>{xddouble} {
+ 				appendStringInfoChar(&xdbuf, '\"');
+ 		}
+ <xd>{xdinside} {
+ 				appendStringInfoString(&xdbuf, yytext);
+ 		}
+ <xd>{xdstop} {
+ 				yylval.str = xdbuf.data;
+ 				BEGIN(INITIAL);
+ 				return NAME;
+ 		}
+ ","			{ return ','; }
+ "["			{ return '['; }
+ "]"			{ return ']'; }
+ [1-9][0-9]*	{
+ 				yylval.str = pstrdup(yytext);
+ 				return NUM;
+ 		}
+ {undquoted_name} {
+ 				yylval.str = pstrdup(yytext);
+ 				return NAME;
+ 		}
+ %%
+ 
+ void
+ yyerror(const char *message)
+ {
+ 	ereport(IsUnderPostmaster ? DEBUG2 : LOG,
+ 			(errcode(ERRCODE_SYNTAX_ERROR),
+ 			 errmsg("%s at or near \"%s\"", message, yytext)));
+ }
+ 
+ void
+ syncrep_scanner_init(const char *str)
+ {
+ 	Size		slen = strlen(str);
+ 	char	   *scanbuf;
+ 
+ 	/*
+ 	 * Might be left over after ereport()
+ 	 */
+ 	if (YY_CURRENT_BUFFER)
+ 		yy_delete_buffer(YY_CURRENT_BUFFER);
+ 
+ 	/*
+ 	 * Make a scan buffer with special termination needed by flex.
+ 	 */
+ 	scanbuf = (char *) palloc(slen + 2);
+ 	memcpy(scanbuf, str, slen);
+ 	scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
+ 	scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
+ }
+ 
+ void
+ syncrep_scanner_finish(void)
+ {
+ 	yy_delete_buffer(scanbufhandle);
+ 	scanbufhandle = NULL;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2666,2671 **** WalSndShmemInit(void)
--- 2666,2672 ----
  		{
  			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
  
+ 			walsnd->slotno = i;
  			SpinLockInit(&walsnd->mutex);
  		}
  	}
***************
*** 2751,2757 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	Tuplestorestate *tupstore;
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
! 	WalSnd	   *sync_standby;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
--- 2752,2758 ----
  	Tuplestorestate *tupstore;
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
! 	List	   *sync_standbys;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
***************
*** 2780,2791 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Get the currently active synchronous standby.
  	 */
  	LWLockAcquire(SyncRepLock, LW_SHARED);
! 	sync_standby = SyncRepGetSynchronousStandby();
  	LWLockRelease(SyncRepLock);
  
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		WalSnd *walsnd = &WalSndCtl->walsnds[i];
--- 2781,2802 ----
  	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Allocate and update the config data of synchronous replication,
! 	 * and then get the currently active synchronous standbys.
  	 */
+ 	SyncRepUpdateConfig();
  	LWLockAcquire(SyncRepLock, LW_SHARED);
! 	sync_standbys = SyncRepGetSyncStandbys();
  	LWLockRelease(SyncRepLock);
  
+ 	/*
+ 	 * Free the previously-allocated config data because a backend
+ 	 * no longer needs it. The next call of this function needs to
+ 	 * allocate and update the config data newly because the setting
+ 	 * of sync replication might be changed between the calls.
+ 	 */
+ 	SyncRepFreeConfig(SyncRepConfig);
+ 
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		WalSnd *walsnd = &WalSndCtl->walsnds[i];
***************
*** 2856,2862 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  			 */
  			if (priority == 0)
  				values[7] = CStringGetTextDatum("async");
! 			else if (walsnd == sync_standby)
  				values[7] = CStringGetTextDatum("sync");
  			else
  				values[7] = CStringGetTextDatum("potential");
--- 2867,2873 ----
  			 */
  			if (priority == 0)
  				values[7] = CStringGetTextDatum("async");
! 			else if (list_member_int(sync_standbys, i))
  				values[7] = CStringGetTextDatum("sync");
  			else
  				values[7] = CStringGetTextDatum("potential");
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 31,36 ****
--- 31,48 ----
  #define SYNC_REP_WAITING			1
  #define SYNC_REP_WAIT_COMPLETE		2
  
+ /*
+  * Struct for the configuration of synchronous replication.
+  */
+ typedef struct SyncRepConfigData
+ {
+ 	int	num_sync;	/* number of sync standbys that we need to wait for */
+ 	List	*members;	/* list of names of potential sync standbys */
+ } SyncRepConfigData;
+ 
+ extern SyncRepConfigData *syncrep_parse_result;
+ extern SyncRepConfigData *SyncRepConfig;
+ 
  /* user-settable parameters for synchronous replication */
  extern char *SyncRepStandbyNames;
  
***************
*** 44,57 **** extern void SyncRepCleanupAtProcExit(void);
  extern void SyncRepInitConfig(void);
  extern void SyncRepReleaseWaiters(void);
  
  /* called by checkpointer */
  extern void SyncRepUpdateSyncStandbysDefined(void);
  
- /* forward declaration to avoid pulling in walsender_private.h */
- struct WalSnd;
- extern struct WalSnd *SyncRepGetSynchronousStandby(void);
- 
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  extern void assign_synchronous_commit(int newval, void *extra);
  
  #endif   /* _SYNCREP_H */
--- 56,80 ----
  extern void SyncRepInitConfig(void);
  extern void SyncRepReleaseWaiters(void);
  
+ /* called by wal sender and user backend */
+ extern List *SyncRepGetSyncStandbys(void);
+ extern void SyncRepUpdateConfig(void);
+ extern void SyncRepFreeConfig(SyncRepConfigData *config);
+ 
  /* called by checkpointer */
  extern void SyncRepUpdateSyncStandbysDefined(void);
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  extern void assign_synchronous_commit(int newval, void *extra);
  
+ /*
+  * Internal functions for parsing synchronous_standby_names grammar,
+  * in syncrep_gram.y and syncrep_scanner.l
+  */
+ extern int  syncrep_yyparse(void);
+ extern int  syncrep_yylex(void);
+ extern void syncrep_yyerror(const char *str);
+ extern void syncrep_scanner_init(const char *query_string);
+ extern void syncrep_scanner_finish(void);
+ 
  #endif   /* _SYNCREP_H */
*** a/src/include/replication/walsender_private.h
--- b/src/include/replication/walsender_private.h
***************
*** 32,37 **** typedef enum WalSndState
--- 32,38 ----
   */
  typedef struct WalSnd
  {
+ 	int		slotno;			/* index of this slot in WalSnd array */
  	pid_t		pid;			/* this walsender's process id, or 0 */
  	WalSndState state;			/* this walsender's state */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
*** a/src/test/perl/PostgresNode.pm
--- b/src/test/perl/PostgresNode.pm
***************
*** 389,394 **** sub init
--- 389,395 ----
  	  unless defined $params{hba_permit_replication};
  	$params{allows_streaming} = 0 unless defined $params{allows_streaming};
  	$params{has_archiving}    = 0 unless defined $params{has_archiving};
+ 	$params{allows_sync_rep} = 0 unless defined $params{allows_sync_rep};
  
  	mkdir $self->backup_dir;
  	mkdir $self->archive_dir;
***************
*** 413,418 **** sub init
--- 414,423 ----
  		print $conf "hot_standby = on\n";
  		print $conf "max_connections = 10\n";
  	}
+ 	if ($params{allows_sync_rep})
+         {
+                 print $conf "synchronous_standby_names = 'standby1,standby2'\n";
+         }
  
  	if ($TestLib::windows_os)
  	{
*** /dev/null
--- b/src/test/recovery/t/006_multisync_rep.pl
***************
*** 0 ****
--- 1,106 ----
+ use strict;
+ use warnings;
+ 
+ use PostgresNode;
+ use TestLib;
+ use Test::More tests => 8;
+ 
+ 
+ # Initialize master node with synchronous_standby_names = 'standby1,standby2'
+ my $node_master = get_new_node('master');
+ $node_master->init(allows_streaming => 1,allows_sync_rep => 1);
+ $node_master->start;
+ my $backup_name = 'my_backup';
+ 
+ # Take backup
+ $node_master->backup($backup_name);
+ 
+ # Create standby1 linking to master
+ my $node_standby_1 = get_new_node('standby1');
+ $node_standby_1->init_from_backup($node_master, $backup_name,
+ 								  has_streaming => 1);
+ $node_standby_1->start;
+ 
+ 
+ # Create standby2 linking to master
+ my $node_standby_2 = get_new_node('standby2');
+ $node_standby_2->init_from_backup($node_master, $backup_name,
+ 								  has_streaming => 1);
+ $node_standby_2->start;
+ 
+ # Create standby3 linking to master
+ my $node_standby_3 = get_new_node('standby3');
+ $node_standby_3->init_from_backup($node_master, $backup_name,
+ 								  has_streaming => 1);
+ $node_standby_3->start;
+ 
+ # Create standby4 linking to master
+ my $node_standby_4 = get_new_node('standby4');
+ $node_standby_4->init_from_backup($node_master, $backup_name,
+ 								  has_streaming => 1);
+ 
+ # Check application sync_state on master initially
+ my $result = $node_master->psql('postgres', "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication;");
+ print "$result \n";
+ is($result, "standby1|1|sync\nstandby2|2|potential\nstandby3|0|async", 'checked for standbys state for backward compatibility');
+ 
+ 
+ # Change the s_s_names = '*' and check sync state
+ $node_master->psql('postgres', "ALTER SYSTEM SET synchronous_standby_names = '*';");
+ $node_master->psql('postgres', "SELECT pg_reload_conf();");
+ 
+ $result = $node_master->psql('postgres', "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication;");
+ print "$result \n";
+ is($result, "standby1|1|sync\nstandby2|1|potential\nstandby3|1|potential", 'checked for standbys state for backward compatibility with asterisk');
+ 
+ # Stop all standbys
+ $node_standby_1->stop;
+ $node_standby_2->stop;
+ $node_standby_3->stop;
+ 
+ # Change the s_s_names = '2[standby1,standby2,standby3]' and check sync state
+ $node_master->psql('postgres', "ALTER SYSTEM SET synchronous_standby_names = '2[standby1,standby2,standby3]';");
+ $node_master->psql('postgres', "SELECT pg_reload_conf();");
+ 
+ # Standby2 and standby3 should be 'sync'
+ $node_standby_2->start;
+ $node_standby_3->start;
+ $result = $node_master->psql('postgres', "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication;");
+ print "$result \n";
+ is($result, "standby2|2|sync\nstandby3|3|sync", 'checked for sync standbys state transition 1');
+ 
+ # Standby1 should be 'sync' instead of standby3, and standby3 should turn to 'potential'
+ $node_standby_1->start;
+ $node_standby_4->start;
+ $result = $node_master->psql('postgres', "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication;");
+ print "$result \n";
+ is($result, "standby2|2|sync\nstandby3|3|potential\nstandby1|1|sync\nstandby4|0|async", 'checked for sync standbys state transition 2');
+ 
+ # Change the s_s_names = '2[standby1,*,standby2]' and check sync state
+ $node_master->psql('postgres', "ALTER SYSTEM SET synchronous_standby_names = '2[standby1,*,standby2]';");
+ $node_master->psql('postgres', "SELECT pg_reload_conf();");
+ 
+ $result = $node_master->psql('postgres', "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication;");
+ print "$result \n";
+ is($result, "standby2|2|sync\nstandby3|2|potential\nstandby1|1|sync\nstandby4|2|potential", 'checked for sync standbys state with asterisk 1');
+ 
+ $node_standby_4->stop;
+ 
+ # Change the s_s_names = '2[*]' and check sync state
+ $node_master->psql('postgres', "ALTER SYSTEM SET synchronous_standby_names = '2[*]';");
+ $node_master->psql('postgres', "SELECT pg_reload_conf();");
+ 
+ $result = $node_master->psql('postgres', "SELECT application_name, sync_priority, sync_state FROM pg_stat_replication;");
+ print "$result \n";
+ is($result, "standby2|1|sync\nstandby3|1|sync\nstandby1|1|potential", 'checked for sync standbys state with asterisk 2');
+ 
+ # Create some content on master and check its presence on standby 1 and standby 2
+ $node_master->psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
+ 
+ $result =  $node_standby_1->psql('postgres', "SELECT count(*) FROM tab_int");
+ print "standby 1: $result\n";
+ is($result, qq(1002), 'check synced content on standby 1');
+ 
+ $result =  $node_standby_1->psql('postgres', "SELECT count(*) FROM tab_int");
+ print "standby 2: $result\n";
+ is($result, qq(1002), 'check synced content on standby 2');
*** a/src/tools/msvc/Mkvcbuild.pm
--- b/src/tools/msvc/Mkvcbuild.pm
***************
*** 149,154 **** sub mkvcbuild
--- 149,156 ----
  	$postgres->AddFiles('src/backend/utils/misc', 'guc-file.l');
  	$postgres->AddFiles('src/backend/replication', 'repl_scanner.l',
  		'repl_gram.y');
+ 	$postgres->AddFiles('src/backend/replication', 'syncrep_scanner.l',
+ 		'syncrep_gram.y');
  	$postgres->AddDefine('BUILDING_DLL');
  	$postgres->AddLibrary('secur32.lib');
  	$postgres->AddLibrary('ws2_32.lib');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to