On Fri, Dec 16, 2016 at 5:04 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Fri, Dec 16, 2016 at 2:38 PM, Michael Paquier
> <michael.paqu...@gmail.com> wrote:
>> On Thu, Dec 15, 2016 at 6:08 PM, Masahiko Sawada <sawada.m...@gmail.com> 
>> wrote:
>>> Attached latest v12 patch.
>>> I changed behavior of "N (standby_list)" to use the priority method
>>> and incorporated some review comments so far. Please review it.
>>
>> Some comments...
>>
>> +    Another example of <varname>synchronous_standby_names</> for multiple
>> +    synchronous standby is:
>> Here standby takes an 's'.
>>
>> +    candidates. The master server will wait for at least 2 replies from 
>> them.
>> +    <literal>s4</> is an asynchronous standby since its name is not in the 
>> list.
>> +   </para>
>> "will wait for replies from at least two of them".
>>
>> + * next-highest-priority standby. In quorum method, the all standbys
>> + * appearing in the list are considered as a candidate for quorum commit.
>> "the all" is incorrect. I think you mean "all the" instead.
>>
>> + * NIL if no sync standby is connected. In quorum method, all standby
>> + * priorities are same, that is 1. So this function returns the list of
>> This is not true. Standys have a priority number assigned. Though it does
>> not matter much for quorum groups, it gives an indication of their position
>> in the defined list.
>>
>>  #synchronous_standby_names = ''    # standby servers that provide sync rep
>>  -               # number of sync standbys and comma-separated list of 
>> application_name
>>  +               # synchronization method, number of sync standbys
>>  +               # and comma-separated list of application_name
>>                  # from standby(s); '*' = all
>> The formulation is funny here: "sync rep synchronization method".
>>
>> I think that Fujii-san has also some doc changes in his box. For anybody
>> picking up this patch next, it would be good to incorporate the things
>> I have noticed here.
>
> Yes, I will. Thanks!

Attached is the modified version of the patch. Barring objections, I will
commit this version.

Even after committing the patch, there will be still many source comments
and documentations that we need to update, for example,
in high-availability.sgml. We need to check and update them throughly later.

Regards,

-- 
Fujii Masao
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 3054,3094 **** include_dir 'conf.d'
          transactions waiting for commit will be allowed to proceed after
          these standby servers confirm receipt of their data.
          The synchronous standbys will be those whose names appear
!         earlier in this list, and
          that are both currently connected and streaming data in real-time
          (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
!         Other standby servers appearing later in this list represent potential
!         synchronous standbys. If any of the current synchronous
!         standbys disconnects for whatever reason,
!         it will be replaced immediately with the next-highest-priority standby.
!         Specifying more than one standby name can allow very high availability.
         </para>
         <para>
          This parameter specifies a list of standby servers using
          either of the following syntaxes:
  <synopsis>
! <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
  <replaceable class="parameter">standby_name</replaceable> [, ...]
  </synopsis>
          where <replaceable class="parameter">num_sync</replaceable> is
          the number of synchronous standbys that transactions need to
          wait for replies from,
          and <replaceable class="parameter">standby_name</replaceable>
!         is the name of a standby server. For example, a setting of
!         <literal>3 (s1, s2, s3, s4)</> makes transaction commits wait
!         until their WAL records are received by three higher-priority standbys
!         chosen from standby servers <literal>s1</>, <literal>s2</>,
!         <literal>s3</> and <literal>s4</>.
!         </para>
!         <para>
!         The second syntax was used before <productname>PostgreSQL</>
          version 9.6 and is still supported. It's the same as the first syntax
!         with <replaceable class="parameter">num_sync</replaceable> equal to 1.
!         For example, <literal>1 (s1, s2)</> and
!         <literal>s1, s2</> have the same meaning: either <literal>s1</>
!         or <literal>s2</> is chosen as a synchronous standby.
         </para>
         <para>
          The name of a standby server for this purpose is the
--- 3054,3124 ----
          transactions waiting for commit will be allowed to proceed after
          these standby servers confirm receipt of their data.
          The synchronous standbys will be those whose names appear
!         in this list, and
          that are both currently connected and streaming data in real-time
          (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
!         Specifying more than one standby names can allow very high availability.
         </para>
         <para>
          This parameter specifies a list of standby servers using
          either of the following syntaxes:
  <synopsis>
! [FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
! ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
  <replaceable class="parameter">standby_name</replaceable> [, ...]
  </synopsis>
          where <replaceable class="parameter">num_sync</replaceable> is
          the number of synchronous standbys that transactions need to
          wait for replies from,
          and <replaceable class="parameter">standby_name</replaceable>
!         is the name of a standby server.
!         <literal>FIRST</> and <literal>ANY</> specify the method to choose
!         synchronous standbys from the listed servers.
!        </para>
!        <para>
!         The keyword <literal>FIRST</>, coupled with
!         <replaceable class="parameter">num_sync</replaceable>, specifies a
!         priority-based synchronous replication and makes transaction commits
!         wait until their WAL records are replicated to
!         <replaceable class="parameter">num_sync</replaceable> synchronous
!         standbys chosen based on their priorities. For example, a setting of
!         <literal>FIRST 3 (s1, s2, s3, s4)</> will cause each commit to wait for
!         replies from three higher-priority standbys chosen from standby servers
!         <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>.
!         The standbys whose names appear earlier in the list are given higher
!         priority and will be considered as synchronous. Other standby servers
!         appearing later in this list represent potential synchronous standbys.
!         If any of the current synchronous standbys disconnects for whatever
!         reason, it will be replaced immediately with the next-highest-priority
!         standby. The keyword <literal>FIRST</> is optional.
!        </para>
!        <para>
!         The keyword <literal>ANY</>, coupled with
!         <replaceable class="parameter">num_sync</replaceable>, specifies a
!         quorum-based synchronous replication and makes transaction commits
!         wait until their WAL records are replicated to <emphasis>at least</>
!         <replaceable class="parameter">num_sync</replaceable> listed standbys.
!         For example, a setting of <literal>ANY 3 (s1, s2, s3, s4)</> will cause
!         each commit to proceed as soon as at least any three standbys of
!         <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>
!         reply.
!        </para>
!        <para>
!         <literal>FIRST</> and <literal>ANY</> are case-insensitive. If these
!         keywords are used as the name of a standby server,
!         its <replaceable class="parameter">standby_name</replaceable> must
!         be double-quoted.
!        </para>
!        <para>
!         The third syntax was used before <productname>PostgreSQL</>
          version 9.6 and is still supported. It's the same as the first syntax
!         with <literal>FIRST</> and
!         <replaceable class="parameter">num_sync</replaceable> equal to 1.
!         For example, <literal>FIRST 1 (s1, s2)</> and <literal>s1, s2</> have
!         the same meaning: either <literal>s1</> or <literal>s2</> is chosen
!         as a synchronous standby.
         </para>
         <para>
          The name of a standby server for this purpose is the
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1138,1156 **** primary_slot_name = 'node_a_slot'
      as synchronous confirm receipt of their data. The number of synchronous
      standbys that transactions must wait for replies from is specified in
      <varname>synchronous_standby_names</>. This parameter also specifies
!     a list of standby names, which determines the priority of each standby
!     for being chosen as a synchronous standby. The standbys whose names
!     appear earlier in the list are given higher priority and will be considered
!     as synchronous. Other standby servers appearing later in this list
!     represent potential synchronous standbys. If any of the current
!     synchronous standbys disconnects for whatever reason, it will be replaced
!     immediately with the next-highest-priority standby.
     </para>
     <para>
!     An example of <varname>synchronous_standby_names</> for multiple
!     synchronous standbys is:
  <programlisting>
! synchronous_standby_names = '2 (s1, s2, s3)'
  </programlisting>
      In this example, if four standby servers <literal>s1</>, <literal>s2</>,
      <literal>s3</> and <literal>s4</> are running, the two standbys
--- 1138,1162 ----
      as synchronous confirm receipt of their data. The number of synchronous
      standbys that transactions must wait for replies from is specified in
      <varname>synchronous_standby_names</>. This parameter also specifies
!     a list of standby names and the method (<literal>FIRST</> and
!     <literal>ANY</>) to choose synchronous standbys from the listed ones.
     </para>
     <para>
!     The method <literal>FIRST</> specifies a priority-based synchronous
!     replication and makes transaction commits wait until their WAL records are
!     replicated to the requested number of synchronous standbys chosen based on
!     their priorities. The standbys whose names appear earlier in the list are
!     given higher priority and will be considered as synchronous. Other standby
!     servers appearing later in this list represent potential synchronous
!     standbys. If any of the current synchronous standbys disconnects for
!     whatever reason, it will be replaced immediately with the
!     next-highest-priority standby.
!    </para>
!    <para>
!     An example of <varname>synchronous_standby_names</> for
!     a priority-based multiple synchronous standbys is:
  <programlisting>
! synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
  </programlisting>
      In this example, if four standby servers <literal>s1</>, <literal>s2</>,
      <literal>s3</> and <literal>s4</> are running, the two standbys
***************
*** 1162,1167 **** synchronous_standby_names = '2 (s1, s2, s3)'
--- 1168,1191 ----
      its name is not in the list.
     </para>
     <para>
+     The method <literal>ANY</> specifies a quorum-based synchronous
+     replication and makes transaction commits wait until their WAL records
+     are replicated to <emphasis>at least</> the requested number of
+     synchronous standbys in the list.
+    </para>
+    <para>
+     An example of <varname>synchronous_standby_names</> for
+     a quorum-based multiple synchronous standbys is:
+ <programlisting>
+  synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
+ </programlisting>
+     In this example, if four standby servers <literal>s1</>, <literal>s2</>,
+     <literal>s3</> and <literal>s4</> are running, transaction commits will
+     wait for replies from at least any two standbys of <literal>s1</>,
+     <literal>s2</> and <literal>s3</>. <literal>s4</> is an asynchronous
+     standby since its name is not in the list.
+    </para>
+    <para>
      The synchronous states of standby servers can be viewed using
      the <structname>pg_stat_replication</structname> view.
     </para>
*** a/doc/src/sgml/monitoring.sgml
--- b/doc/src/sgml/monitoring.sgml
***************
*** 1412,1418 **** SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       <entry><structfield>sync_priority</></entry>
       <entry><type>integer</></entry>
       <entry>Priority of this standby server for being chosen as the
!       synchronous standby</entry>
      </row>
      <row>
       <entry><structfield>sync_state</></entry>
--- 1412,1419 ----
       <entry><structfield>sync_priority</></entry>
       <entry><type>integer</></entry>
       <entry>Priority of this standby server for being chosen as the
!       synchronous standby in a priority-based synchronous replication.
!       This has no effect in a quorum-based synchronous replication.</entry>
      </row>
      <row>
       <entry><structfield>sync_state</></entry>
***************
*** 1437,1442 **** SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
--- 1438,1449 ----
             <literal>sync</>: This standby server is synchronous.
            </para>
           </listitem>
+          <listitem>
+           <para>
+            <literal>quorum</>: This standby server is considered as a candidate
+            for quorum standbys.
+           </para>
+          </listitem>
         </itemizedlist>
       </entry>
      </row>
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 26,32 **** repl_gram.o: repl_scanner.c
  
  # syncrep_scanner is complied as part of syncrep_gram
  syncrep_gram.o: syncrep_scanner.c
! syncrep_scanner.c: FLEXFLAGS = -CF -p
  syncrep_scanner.c: FLEX_NO_BACKUP=yes
  
  # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
--- 26,32 ----
  
  # syncrep_scanner is complied as part of syncrep_gram
  syncrep_gram.o: syncrep_scanner.c
! syncrep_scanner.c: FLEXFLAGS = -CF -p -i
  syncrep_scanner.c: FLEX_NO_BACKUP=yes
  
  # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 30,52 ****
   * searching the through all waiters each time we receive a reply.
   *
   * In 9.5 or before only a single standby could be considered as
!  * synchronous. In 9.6 we support multiple synchronous standbys.
!  * The number of synchronous standbys that transactions must wait for
!  * replies from is specified in synchronous_standby_names.
!  * This parameter also specifies a list of standby names,
!  * which determines the priority of each standby for being chosen as
!  * a synchronous standby. The standbys whose names appear earlier
!  * in the list are given higher priority and will be considered as
!  * synchronous. Other standby servers appearing later in this list
!  * represent potential synchronous standbys. If any of the current
!  * synchronous standbys disconnects for whatever reason, it will be
!  * replaced immediately with the next-highest-priority standby.
   *
   * Before the standbys chosen from synchronous_standby_names can
   * become the synchronous standbys they must have caught up with
   * the primary; that may take some time. Once caught up,
!  * the current higher priority standbys which are considered as
!  * synchronous at that moment will release waiters from the queue.
   *
   * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
   *
--- 30,63 ----
   * searching the through all waiters each time we receive a reply.
   *
   * In 9.5 or before only a single standby could be considered as
!  * synchronous. In 9.6 we support a priority-based multiple synchronous
!  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
!  * supported. The number of synchronous standbys that transactions
!  * must wait for replies from is specified in synchronous_standby_names.
!  * This parameter also specifies a list of standby names and the method
!  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
!  * 
!  * The method FIRST specifies a priority-based synchronous replication
!  * and makes transaction commits wait until their WAL records are
!  * replicated to the requested number of synchronous standbys chosen based
!  * on their priorities. The standbys whose names appear earlier in the list
!  * are given higher priority and will be considered as synchronous.
!  * Other standby servers appearing later in this list represent potential
!  * synchronous standbys. If any of the current synchronous standbys
!  * disconnects for whatever reason, it will be replaced immediately with
!  * the next-highest-priority standby.
!  *
!  * The method ANY specifies a quorum-based synchronous replication
!  * and makes transaction commits wait until their WAL records are
!  * replicated to at least the requested number of synchronous standbys
!  * in the list. All the standbys appearing in the list are considered as
!  * candidates for quorum synchronous standbys.
   *
   * Before the standbys chosen from synchronous_standby_names can
   * become the synchronous standbys they must have caught up with
   * the primary; that may take some time. Once caught up,
!  * the standbys which are considered as synchronous at that moment
!  * will release waiters from the queue.
   *
   * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
   *
***************
*** 79,96 **** char	   *SyncRepStandbyNames;
  
  static bool announce_next_takeover = true;
  
! static SyncRepConfigData *SyncRepConfig = NULL;
  static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
  
  static void SyncRepQueueInsert(int mode);
  static void SyncRepCancelWait(void);
  static int	SyncRepWakeQueue(bool all, int mode);
  
! static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
! 						   XLogRecPtr *flushPtr,
! 						   XLogRecPtr *applyPtr,
! 						   bool *am_sync);
  static int	SyncRepGetStandbyPriority(void);
  
  #ifdef USE_ASSERT_CHECKING
  static bool SyncRepQueueIsOrderedByLSN(int mode);
--- 90,118 ----
  
  static bool announce_next_takeover = true;
  
! SyncRepConfigData *SyncRepConfig = NULL;
  static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
  
  static void SyncRepQueueInsert(int mode);
  static void SyncRepCancelWait(void);
  static int	SyncRepWakeQueue(bool all, int mode);
  
! static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
! 								 XLogRecPtr *flushPtr,
! 								 XLogRecPtr *applyPtr,
! 								 bool *am_sync);
! static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
! 									   XLogRecPtr *flushPtr,
! 									   XLogRecPtr *applyPtr,
! 									   List *sync_standbys);
! static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
! 										  XLogRecPtr *flushPtr,
! 										  XLogRecPtr *applyPtr,
! 										  List *sync_standbys, uint8 nth);
  static int	SyncRepGetStandbyPriority(void);
+ static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
+ static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+ static int	cmp_lsn(const void *a, const void *b);
  
  #ifdef USE_ASSERT_CHECKING
  static bool SyncRepQueueIsOrderedByLSN(int mode);
***************
*** 386,392 **** SyncRepReleaseWaiters(void)
  	XLogRecPtr	writePtr;
  	XLogRecPtr	flushPtr;
  	XLogRecPtr	applyPtr;
! 	bool		got_oldest;
  	bool		am_sync;
  	int			numwrite = 0;
  	int			numflush = 0;
--- 408,414 ----
  	XLogRecPtr	writePtr;
  	XLogRecPtr	flushPtr;
  	XLogRecPtr	applyPtr;
! 	bool		got_recptr;
  	bool		am_sync;
  	int			numwrite = 0;
  	int			numflush = 0;
***************
*** 413,423 **** SyncRepReleaseWaiters(void)
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  
  	/*
! 	 * Check whether we are a sync standby or not, and calculate the oldest
  	 * positions among all sync standbys.
  	 */
! 	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
! 											&applyPtr, &am_sync);
  
  	/*
  	 * If we are managing a sync standby, though we weren't prior to this,
--- 435,444 ----
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  
  	/*
! 	 * Check whether we are a sync standby or not, and calculate the synced
  	 * positions among all sync standbys.
  	 */
! 	got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
  
  	/*
  	 * If we are managing a sync standby, though we weren't prior to this,
***************
*** 426,441 **** SyncRepReleaseWaiters(void)
  	if (announce_next_takeover && am_sync)
  	{
  		announce_next_takeover = false;
! 		ereport(LOG,
! 				(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
! 						application_name, MyWalSnd->sync_standby_priority)));
  	}
  
  	/*
  	 * If the number of sync standbys is less than requested or we aren't
  	 * managing a sync standby then just leave.
  	 */
! 	if (!got_oldest || !am_sync)
  	{
  		LWLockRelease(SyncRepLock);
  		announce_next_takeover = !am_sync;
--- 447,468 ----
  	if (announce_next_takeover && am_sync)
  	{
  		announce_next_takeover = false;
! 
! 		if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
! 			ereport(LOG,
! 					(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
! 							application_name, MyWalSnd->sync_standby_priority)));
! 		else
! 			ereport(LOG,
! 					(errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
! 							application_name)));
  	}
  
  	/*
  	 * If the number of sync standbys is less than requested or we aren't
  	 * managing a sync standby then just leave.
  	 */
! 	if (!got_recptr || !am_sync)
  	{
  		LWLockRelease(SyncRepLock);
  		announce_next_takeover = !am_sync;
***************
*** 471,491 **** SyncRepReleaseWaiters(void)
  }
  
  /*
!  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
   *
   * Return false if the number of sync standbys is less than
   * synchronous_standby_names specifies. Otherwise return true and
!  * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
   *
   * On return, *am_sync is set to true if this walsender is connecting to
   * sync standby. Otherwise it's set to false.
   */
  static bool
! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  						   XLogRecPtr *applyPtr, bool *am_sync)
  {
  	List	   *sync_standbys;
- 	ListCell   *cell;
  
  	*writePtr = InvalidXLogRecPtr;
  	*flushPtr = InvalidXLogRecPtr;
--- 498,517 ----
  }
  
  /*
!  * Calculate the synced Write, Flush and Apply positions among sync standbys.
   *
   * Return false if the number of sync standbys is less than
   * synchronous_standby_names specifies. Otherwise return true and
!  * store the positions into *writePtr, *flushPtr and *applyPtr.
   *
   * On return, *am_sync is set to true if this walsender is connecting to
   * sync standby. Otherwise it's set to false.
   */
  static bool
! SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  						   XLogRecPtr *applyPtr, bool *am_sync)
  {
  	List	   *sync_standbys;
  
  	*writePtr = InvalidXLogRecPtr;
  	*flushPtr = InvalidXLogRecPtr;
***************
*** 508,519 **** SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  	}
  
  	/*
! 	 * Scan through all sync standbys and calculate the oldest Write, Flush
! 	 * and Apply positions.
  	 */
! 	foreach(cell, sync_standbys)
  	{
! 		WalSnd	   *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
  		XLogRecPtr	write;
  		XLogRecPtr	flush;
  		XLogRecPtr	apply;
--- 534,582 ----
  	}
  
  	/*
! 	 * In a priority-based sync replication, the synced positions are the
! 	 * oldest ones among sync standbys. In a quorum-based, they are the Nth
! 	 * latest ones.
! 	 *
! 	 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions.
! 	 * But we use SyncRepGetOldestSyncRecPtr() for that calculation because
! 	 * it's a bit more efficient.
! 	 *
! 	 * XXX If the numbers of current and requested sync standbys are the same,
! 	 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
! 	 * positions even in a quorum-based sync replication.
! 	 */
! 	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
! 	{
! 		SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
! 								   sync_standbys);
! 	}
! 	else
! 	{
! 		SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
! 									  sync_standbys, SyncRepConfig->num_sync);
! 	}
! 
! 	list_free(sync_standbys);
! 	return true;
! }
! 
! /*
!  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
!  */
! static void
! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
! 						   XLogRecPtr *applyPtr, List *sync_standbys)
! {
! 	ListCell	*cell;
! 
! 	/*
! 	 * Scan through all sync standbys and calculate the oldest
! 	 * Write, Flush and Apply positions.
  	 */
! 	foreach (cell, sync_standbys)
  	{
! 		WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
  		XLogRecPtr	write;
  		XLogRecPtr	flush;
  		XLogRecPtr	apply;
***************
*** 531,553 **** SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
  			*applyPtr = apply;
  	}
  
! 	list_free(sync_standbys);
! 	return true;
  }
  
  /*
   * Return the list of sync standbys, or NIL if no sync standby is connected.
   *
-  * If there are multiple standbys with the same priority,
-  * the first one found is selected preferentially.
   * The caller must hold SyncRepLock.
   *
   * On return, *am_sync is set to true if this walsender is connecting to
   * sync standby. Otherwise it's set to false.
   */
  List *
! SyncRepGetSyncStandbys(bool *am_sync)
  {
  	List	   *result = NIL;
  	List	   *pending = NIL;
--- 594,756 ----
  		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
  			*applyPtr = apply;
  	}
+ }
  
! /*
!  * Calculate the Nth latest Write, Flush and Apply positions among sync
!  * standbys.
!  */
! static void
! SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
! 						  XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
! {
! 	ListCell	*cell;
! 	XLogRecPtr	*write_array;
! 	XLogRecPtr	*flush_array;
! 	XLogRecPtr	*apply_array;
! 	int	len;
! 	int	i = 0;
! 
! 	len = list_length(sync_standbys);
! 	write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
! 	flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
! 	apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
! 
! 	foreach (cell, sync_standbys)
! 	{
! 		WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
! 
! 		SpinLockAcquire(&walsnd->mutex);
! 		write_array[i] = walsnd->write;
! 		flush_array[i] = walsnd->flush;
! 		apply_array[i] = walsnd->apply;
! 		SpinLockRelease(&walsnd->mutex);
! 
! 		i++;
! 	}
! 
! 	qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
! 	qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
! 	qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
! 
! 	/* Get Nth latest Write, Flush, Apply positions */
! 	*writePtr = write_array[nth - 1];
! 	*flushPtr = flush_array[nth - 1];
! 	*applyPtr = apply_array[nth - 1];
! 
! 	pfree(write_array);
! 	pfree(flush_array);
! 	pfree(apply_array);
! }
! 
! /*
!  * Compare lsn in order to sort array in descending order.
!  */
! static int
! cmp_lsn(const void *a, const void *b)
! {
! 	XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
! 	XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
! 
! 	if (lsn1 > lsn2)
! 		return -1;
! 	else if (lsn1 == lsn2)
! 		return 0;
! 	else
! 		return 1;
  }
  
  /*
   * Return the list of sync standbys, or NIL if no sync standby is connected.
   *
   * The caller must hold SyncRepLock.
   *
   * On return, *am_sync is set to true if this walsender is connecting to
   * sync standby. Otherwise it's set to false.
   */
  List *
! SyncRepGetSyncStandbys(bool	*am_sync)
! {
! 	/* Set default result */
! 	if (am_sync != NULL)
! 		*am_sync = false;
! 
! 	/* Quick exit if sync replication is not requested */
! 	if (SyncRepConfig == NULL)
! 		return NIL;
! 
! 	return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
! 		SyncRepGetSyncStandbysPriority(am_sync) :
! 		SyncRepGetSyncStandbysQuorum(am_sync);
! }
! 
! /*
!  * Return the list of all the candidates for quorum sync standbys,
!  * or NIL if no such standby is connected.
!  *
!  * The caller must hold SyncRepLock. This function must be called only in
!  * a quorum-based sync replication.
!  *
!  * On return, *am_sync is set to true if this walsender is connecting to
!  * sync standby. Otherwise it's set to false.
!  */
! static List *
! SyncRepGetSyncStandbysQuorum(bool *am_sync)
! {
! 	List	*result = NIL;
! 	int i;
! 	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
! 								 * rearrangement */
! 
! 	Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
! 
! 	for (i = 0; i < max_wal_senders; i++)
! 	{
! 		walsnd = &WalSndCtl->walsnds[i];
! 
! 		/* Must be active */
! 		if (walsnd->pid == 0)
! 			continue;
! 
! 		/* Must be streaming */
! 		if (walsnd->state != WALSNDSTATE_STREAMING)
! 			continue;
! 
! 		/* Must be synchronous */
! 		if (walsnd->sync_standby_priority == 0)
! 			continue;
! 
! 		/* Must have a valid flush position */
! 		if (XLogRecPtrIsInvalid(walsnd->flush))
! 			continue;
! 
! 		/*
! 		 * Consider this standby as a candidate for quorum sync standbys
! 		 * and append it to the result.
! 		 */
! 		result = lappend_int(result, i);
! 		if (am_sync != NULL && walsnd == MyWalSnd)
! 			*am_sync = true;
! 	}
! 
! 	return result;
! }
! 
! /*
!  * Return the list of sync standbys chosen based on their priorities,
!  * or NIL if no sync standby is connected.
!  *
!  * If there are multiple standbys with the same priority,
!  * the first one found is selected preferentially.
!  *
!  * The caller must hold SyncRepLock. This function must be called only in
!  * a priority-based sync replication.
!  *
!  * On return, *am_sync is set to true if this walsender is connecting to
!  * sync standby. Otherwise it's set to false.
!  */
! static List *
! SyncRepGetSyncStandbysPriority(bool *am_sync)
  {
  	List	   *result = NIL;
  	List	   *pending = NIL;
***************
*** 560,572 **** SyncRepGetSyncStandbys(bool *am_sync)
  	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
  								 * rearrangement */
  
! 	/* Set default result */
! 	if (am_sync != NULL)
! 		*am_sync = false;
! 
! 	/* Quick exit if sync replication is not requested */
! 	if (SyncRepConfig == NULL)
! 		return NIL;
  
  	lowest_priority = SyncRepConfig->nmembers;
  	next_highest_priority = lowest_priority + 1;
--- 763,769 ----
  	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
  								 * rearrangement */
  
! 	Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
  
  	lowest_priority = SyncRepConfig->nmembers;
  	next_highest_priority = lowest_priority + 1;
*** a/src/backend/replication/syncrep_gram.y
--- b/src/backend/replication/syncrep_gram.y
***************
*** 21,27 **** SyncRepConfigData *syncrep_parse_result;
  char	   *syncrep_parse_error_msg;
  
  static SyncRepConfigData *create_syncrep_config(const char *num_sync,
! 					  List *members);
  
  /*
   * Bison doesn't allocate anything that needs to live across parser calls,
--- 21,27 ----
  char	   *syncrep_parse_error_msg;
  
  static SyncRepConfigData *create_syncrep_config(const char *num_sync,
! 					List *members, uint8 syncrep_method);
  
  /*
   * Bison doesn't allocate anything that needs to live across parser calls,
***************
*** 46,52 **** static SyncRepConfigData *create_syncrep_config(const char *num_sync,
  	SyncRepConfigData *config;
  }
  
! %token <str> NAME NUM JUNK
  
  %type <config> result standby_config
  %type <list> standby_list
--- 46,52 ----
  	SyncRepConfigData *config;
  }
  
! %token <str> NAME NUM JUNK ANY FIRST
  
  %type <config> result standby_config
  %type <list> standby_list
***************
*** 60,67 **** result:
  	;
  
  standby_config:
! 		standby_list				{ $$ = create_syncrep_config("1", $1); }
! 		| NUM '(' standby_list ')'	{ $$ = create_syncrep_config($1, $3); }
  	;
  
  standby_list:
--- 60,69 ----
  	;
  
  standby_config:
! 		standby_list				{ $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
! 		| NUM '(' standby_list ')'		{ $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); }
! 		| ANY NUM '(' standby_list ')'		{ $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
! 		| FIRST NUM '(' standby_list ')'		{ $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
  	;
  
  standby_list:
***************
*** 75,83 **** standby_name:
  	;
  %%
  
- 
  static SyncRepConfigData *
! create_syncrep_config(const char *num_sync, List *members)
  {
  	SyncRepConfigData *config;
  	int			size;
--- 77,84 ----
  	;
  %%
  
  static SyncRepConfigData *
! create_syncrep_config(const char *num_sync, List *members, uint8 syncrep_method)
  {
  	SyncRepConfigData *config;
  	int			size;
***************
*** 98,103 **** create_syncrep_config(const char *num_sync, List *members)
--- 99,105 ----
  
  	config->config_size = size;
  	config->num_sync = atoi(num_sync);
+ 	config->syncrep_method = syncrep_method;
  	config->nmembers = list_length(members);
  	ptr = config->member_names;
  	foreach(lc, members)
*** a/src/backend/replication/syncrep_scanner.l
--- b/src/backend/replication/syncrep_scanner.l
***************
*** 64,69 **** xdinside		[^"]+
--- 64,72 ----
  %%
  {space}+	{ /* ignore */ }
  
+ ANY		{ return ANY; }
+ FIRST		{ return FIRST; }
+ 
  {xdstart}	{
  				initStringInfo(&xdbuf);
  				BEGIN(xd);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2868,2879 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  
  			/*
  			 * More easily understood version of standby state. This is purely
! 			 * informational, not different from priority.
  			 */
  			if (priority == 0)
  				values[7] = CStringGetTextDatum("async");
  			else if (list_member_int(sync_standbys, i))
! 				values[7] = CStringGetTextDatum("sync");
  			else
  				values[7] = CStringGetTextDatum("potential");
  		}
--- 2868,2887 ----
  
  			/*
  			 * More easily understood version of standby state. This is purely
! 			 * informational.
! 			 *
! 			 * In quorum-based sync replication, the role of each standby
! 			 * listed in synchronous_standby_names can be changing very
! 			 * frequently. Any standbys considered as "sync" at one moment can
! 			 * be switched to "potential" ones at the next moment. So, it's
! 			 * basically useless to report "sync" or "potential" as their sync
! 			 * states. We report just "quorum" for them.
  			 */
  			if (priority == 0)
  				values[7] = CStringGetTextDatum("async");
  			else if (list_member_int(sync_standbys, i))
! 				values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
! 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
  			else
  				values[7] = CStringGetTextDatum("potential");
  		}
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 245,251 ****
  # These settings are ignored on a standby server.
  
  #synchronous_standby_names = ''	# standby servers that provide sync rep
! 				# number of sync standbys and comma-separated list of application_name
  				# from standby(s); '*' = all
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
  
--- 245,252 ----
  # These settings are ignored on a standby server.
  
  #synchronous_standby_names = ''	# standby servers that provide sync rep
! 				# method to choose sync standbys, number of sync standbys
! 				# and comma-separated list of application_name
  				# from standby(s); '*' = all
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
  
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 32,37 ****
--- 32,41 ----
  #define SYNC_REP_WAITING			1
  #define SYNC_REP_WAIT_COMPLETE		2
  
+ /* syncrep_method of SyncRepConfigData */
+ #define SYNC_REP_PRIORITY		0
+ #define SYNC_REP_QUORUM		1
+ 
  /*
   * Struct for the configuration of synchronous replication.
   *
***************
*** 44,54 **** typedef struct SyncRepConfigData
--- 48,61 ----
  	int			config_size;	/* total size of this struct, in bytes */
  	int			num_sync;		/* number of sync standbys that we need to
  								 * wait for */
+ 	uint8		syncrep_method;	/* method to choose sync standbys */
  	int			nmembers;		/* number of members in the following list */
  	/* member_names contains nmembers consecutive nul-terminated C strings */
  	char		member_names[FLEXIBLE_ARRAY_MEMBER];
  } SyncRepConfigData;
  
+ extern SyncRepConfigData *SyncRepConfig;
+ 
  /* communication variables for parsing synchronous_standby_names GUC */
  extern SyncRepConfigData *syncrep_parse_result;
  extern char *syncrep_parse_error_msg;
*** a/src/test/recovery/t/007_sync_rep.pl
--- b/src/test/recovery/t/007_sync_rep.pl
***************
*** 3,9 **** use strict;
  use warnings;
  use PostgresNode;
  use TestLib;
! use Test::More tests => 8;
  
  # Query checking sync_priority and sync_state of each standby
  my $check_sql =
--- 3,9 ----
  use warnings;
  use PostgresNode;
  use TestLib;
! use Test::More tests => 11;
  
  # Query checking sync_priority and sync_state of each standby
  my $check_sql =
***************
*** 172,174 **** test_sync_state(
--- 172,205 ----
  standby2|1|sync
  standby4|1|potential),
  	'potential standby found earlier in array is promoted to sync');
+ 
+ # Check that standby1 and standby2 are chosen as sync standbys
+ # based on their priorities.
+ test_sync_state(
+ $node_master, qq(standby1|1|sync
+ standby2|2|sync
+ standby4|0|async),
+ 'priority-based sync replication specified by FIRST keyword',
+ 'FIRST 2(standby1, standby2)');
+ 
+ # Check that all the listed standbys are considered as candidates
+ # for sync standbys in a quorum-based sync replication.
+ test_sync_state(
+ $node_master, qq(standby1|1|quorum
+ standby2|2|quorum
+ standby4|0|async),
+ '2 quorum and 1 async',
+ 'ANY 2(standby1, standby2)');
+ 
+ # Start Standby3 which will be considered in 'quorum' state.
+ $node_standby_3->start;
+ 
+ # Check that the setting of 'ANY 2(*)' chooses all standbys as
+ # candidates for quorum sync standbys.
+ test_sync_state(
+ $node_master, qq(standby1|1|quorum
+ standby2|1|quorum
+ standby3|1|quorum
+ standby4|1|quorum),
+ 'all standbys are considered as candidates for quorum sync standbys',
+ 'ANY 2(*)');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to