On Mon, Nov 14, 2016 at 5:39 PM, Masahiko Sawada <sawada.m...@gmail.com> wrote:
> On Tue, Nov 8, 2016 at 10:12 PM, Michael Paquier
> <michael.paqu...@gmail.com> wrote:
>> On Tue, Nov 8, 2016 at 12:25 AM, Masahiko Sawada <sawada.m...@gmail.com> 
>> wrote:
>>> On Tue, Oct 25, 2016 at 10:35 PM, Michael Paquier
>>> <michael.paqu...@gmail.com> wrote:
>>>> +   if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY)
>>>> +       return SyncRepGetSyncStandbysPriority(am_sync);
>>>> +   else /* SYNC_REP_QUORUM */
>>>> +       return SyncRepGetSyncStandbysQuorum(am_sync);
>>>> Both routines share the same logic to detect if a WAL sender can be
>>>> selected as a candidate for sync evaluation or not, still per the
>>>> selection they do I agree that it is better to keep them as separate.
>>>>
>>>> +   /* In quroum method, all sync standby priorities are always 1 */
>>>> +   if (found && SyncRepConfig->sync_method == SYNC_REP_QUORUM)
>>>> +       priority = 1;
>>>> Honestly I don't understand why you are enforcing that. Priority can
>>>> be important for users willing to switch from ANY to FIRST to have a
>>>> look immediately at what are the standbys that would become sync or
>>>> potential.
>>>
>>> I thought that since all standbys appearing in s_s_names list are
>>> treated equally in quorum method, these standbys should have same
>>> priority.
>>> If these standby have different sync_priority, it looks like that
>>> master server replicates to standby server based on priority.
>>
>> No actually, because we know that they are a quorum set, and that they
>> work in the same set. The concept of priorities has no real meaning
>> for quorum as there is no ordering of the elements. Another, perhaps
>> cleaner idea may be to mark the field as NULL actually.
>
> We know that but I'm concerned it might confuse the user.
> If these priorities are the same, it can obviously imply that all of
> the standby listed in s_s_names are handled equally.
>
>>>> It is not possible to guess from how many standbys this needs to wait
>>>> for. One idea would be to mark the sync_state not as "quorum", but
>>>> "quorum-N", or just add a new column to indicate how many in the set
>>>> need to give a commit confirmation.
>>>
>>> As Simon suggested before, we could support another feature that
>>> allows the client to control the quorum number.
>>> Considering adding that feature, I thought it's better to have and
>>> control that information as a GUC parameter.
>>> Thought?
>>
>> Similarly that would be a SIGHUP parameter? Why not. Perhaps my worry
>> is not that much legitimate, users could just look at s_s_names to
>> guess how many in hte set a commit needs to wait for.
>
> It would be PGC_USRSET similar to synchronous_commit. The user can
> specify it in statement level.
>
>> +        <para>
>> +        <literal>FIRST</> and <literal>ANY</> are case-insensitive word
>> +        and the standby name having these words are must be double-quoted.
>> +        </para>
>> s/word/words/.
>>
>> +        <literal>FIRST</> and <literal>ANY</> specify the method of
>> +        that how master server controls the standby servers.
>> A little bit hard to understand, I would suggest:
>> FIRST and ANY specify the method used by the master to control the
>> standby servers.
>>
>> +        The keyword <literal>FIRST</>, coupled with an integer
>> +        number N higher-priority standbys and makes transaction commit
>> +        when their WAL records are received.
>> This is unclear to me. Here is a correction:
>> The keyword FIRST, coupled with an integer N, makes transaction commit
>> wait until WAL records are received fron the N standbys with higher
>> priority number.
>>
>> +        <varname>synchronous_standby_names</>. For example, a setting
>> +        of <literal>ANY 3 (s1, s2, s3, s4)</> makes transaction commits
>> +        wait until receiving receipts from at least any three standbys
>> +        of four listed servers <literal>s1</>, <literal>s2</>, 
>> <literal>s3</>,
>> This could just mention WAL records instead of "receipts".
>>
>> Instead of saying "an integer number N", we could use <literal>num_sync</>.
>>
>> +         If <literal>FIRST</> or <literal>ANY</> are not specified,
>> this parameter
>> +         behaves as <literal>ANY</>. Note that this grammar is
>> incompatible with
>> +         <productname>PostgresSQL</> 9.6, where no keyword specified
>> is equivalent
>> +         as if <literal>FIRST</> was specified. In short, there is no
>> real need to
>> +         specify <replaceable class="parameter">num_sync</replaceable> as 
>> this
>> +         behavior does not have changed, as well as it is not
>> necessary to mention
>> +         pre-9.6 versions are the multi-sync grammar has been added in 9.6.
>> This paragraph could be reworked, say:
>> if FIRST or ANY are not specified this parameter behaves as if ANY is
>> used. Note that this grammar is incompatible with PostgreSQL 9.6 which
>> is the first version supporting multiple standbys with synchronous
>> replication, where no such keyword FIRST or ANY can be used. Note that
>> the grammar behaves as if FIRST is used, which is incompatible with
>> the post-9.6 behavior.
>>
>> +     <entry>Synchronous state of this standby server. <literal>quorum-N</>
>> +     , where N is the number of synchronous standbys that transactions
>> +     need to wait for replies from, when standby is considered as a
>> +     candidate of quorum commit.</entry>
>> Nitpicking: I think that the comma goes to the previous line if it is
>> the first character of a line.
>>
>> +   if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY)
>> +       return SyncRepGetSyncStandbysPriority(am_sync);
>> +   else /* SYNC_REP_QUORUM */
>> +       return SyncRepGetSyncStandbysQuorum(am_sync)
>> Or that?
>> if (PRIORITY)
>>     return StandbysPriority();
>> else if (QUORUM)
>>     return StandbysQuorum();
>> else
>>     elog(ERROR, "Boom");
>>
>> + * In priority method, we need the oldest these positions among sync
>> + * standbys. In quorum method, we need the newest these positions
>> + * specified by SyncRepConfig->num_sync.
>> Last sentence is grammatically incorrect, and it would be more correct
>> to precise the Nth LSN positions to be able to select k standbys from
>> a set of n ones.
>>
>> +           SpinLockAcquire(&walsnd->mutex);
>> +           write_array[i] = walsnd->write;
>> +           flush_array[i]= walsnd->flush;
>> +           apply_array[i] = walsnd->flush;
>> +           SpinLockRelease(&walsnd->mutex);
>> A nit: adding a space on the self of the second = character. And you
>> need to save the apply position of the WAL sender, not the flush
>> position in the array that is going to be ordered.
>>
>>             /*
>>              * More easily understood version of standby state. This is 
>> purely
>> -            * informational, not different from priority.
>> +            * informational. In quorum method, we add the number to indicate
>> +            * how many in the set need to give a commit confirmation.
>>              */
>>             if (priority == 0)
>>                 values[7] = CStringGetTextDatum("async");
>>             else if (list_member_int(sync_standbys, i))
>> -               values[7] = CStringGetTextDatum("sync");
>> +               values[7] = SyncRepConfig->sync_method == SYNC_REP_PRIORITY ?
>> +                   CStringGetTextDatum("sync") : 
>> CStringGetTextDatum("quorum")
>> This code block and its explanation comments tell two different
>> stories. The comment is saying that something like "quorum-N" is used
>> but the code always prints "quorum".
>>
>> It may be a good idea in the test to check that when no keywords is
>> specified the group of standbys is in quorum mode.
>
> Yeah, I will add some tests.
>
> I will post new version patch incorporated other comments.
>

Attached latest version patch incorporated review comments. After more
thought, I agree and changed the value of standby priority in quorum
method so that it's not set 1 forcibly. The all standby priorities are
1 If s_s_names = 'ANY(*)'.
Please review this patch.

Regards,

--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index adab2f8..e125dff 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3028,42 +3028,75 @@ include_dir 'conf.d'
         transactions waiting for commit will be allowed to proceed after
         these standby servers confirm receipt of their data.
         The synchronous standbys will be those whose names appear
-        earlier in this list, and
+        in this list, and
         that are both currently connected and streaming data in real-time
         (as shown by a state of <literal>streaming</literal> in the
         <link linkend="monitoring-stats-views-table">
-        <literal>pg_stat_replication</></link> view).
-        Other standby servers appearing later in this list represent potential
-        synchronous standbys. If any of the current synchronous
-        standbys disconnects for whatever reason,
-        it will be replaced immediately with the next-highest-priority standby.
-        Specifying more than one standby name can allow very high availability.
+        <literal>pg_stat_replication</></link> view). If the keyword
+        <literal>FIRST</> is specified, other standby servers appearing
+        later in this list represent potential synchronous standbys.
+        If any of the current synchronous standbys disconnects for
+        whatever reason, it will be replaced immediately with the
+        next-highest-priority standby. Specifying more than one standby
+        name can allow very high availability.
        </para>
        <para>
         This parameter specifies a list of standby servers using
         either of the following syntaxes:
 <synopsis>
-<replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+[ANY] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+FIRST <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
 <replaceable class="parameter">standby_name</replaceable> [, ...]
 </synopsis>
         where <replaceable class="parameter">num_sync</replaceable> is
         the number of synchronous standbys that transactions need to
         wait for replies from,
         and <replaceable class="parameter">standby_name</replaceable>
-        is the name of a standby server. For example, a setting of
-        <literal>3 (s1, s2, s3, s4)</> makes transaction commits wait
-        until their WAL records are received by three higher-priority standbys
-        chosen from standby servers <literal>s1</>, <literal>s2</>,
-        <literal>s3</> and <literal>s4</>.
+        is the name of a standby server.
+        <literal>FIRST</> and <literal>ANY</> specify the method used by
+        the master to control the standby servres.
         </para>
         <para>
-        The second syntax was used before <productname>PostgreSQL</>
+        The keyword <literal>FIRST</>, coupled with <literal>num_sync</>, makes
+        transaction commit wait until WAL records are received from the
+        <literal>num_sync</> standbys with higher priority number.
+        For example, a setting of <literal>FIRST 3 (s1, s2, s3, s4)</>
+        makes transaction commits wait until their WAL records are received
+        by three higher-priority standbys chosen from standby servers
+        <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>.
+        </para>
+        <para>
+        The keyword <literal>ANY</>, coupled with <literal>num_sync</>,
+        makes transaction commits wait until WAL records are received
+        from at least <literal>num_sync</> connected standbys among those
+        defined in the list of <varname>synchronous_standby_names</>. For
+        example, a setting of <literal>ANY 3 (s1, s2, s3, s4)</> makes
+        transaction commits wait until receiving WAL records from at least
+        any three standbys of four listed servers <literal>s1</>,
+        <literal>s2</>, <literal>s3</>, <literal>s4</>.
+        </para>
+        <para>
+        <literal>FIRST</> and <literal>ANY</> are case-insensitive words
+        and the standby name having these words are must be double-quoted.
+        </para>
+        <para>
+        The third syntax was used before <productname>PostgreSQL</>
         version 9.6 and is still supported. It's the same as the first syntax
-        with <replaceable class="parameter">num_sync</replaceable> equal to 1.
-        For example, <literal>1 (s1, s2)</> and
-        <literal>s1, s2</> have the same meaning: either <literal>s1</>
-        or <literal>s2</> is chosen as a synchronous standby.
-       </para>
+        with <literal>FIRST</> and <replaceable class="parameter">num_sync</replaceable>
+        equal to 1. For example, <literal>FIRST 1 (s1, s2)</> and <literal>s1, s2</>
+        have the same meaning: either <literal>s1</> or <literal>s2</> is
+        chosen as a synchronous standby.
+        </para>
+        <note>
+         <para>
+         If <literal>FIRST</> or <literal>ANY</> are not specified, this parameter
+         behaves as if <literal>ANY</> is used. Note that this grammar is incompatible
+         with <productname>PostgresSQL</> 9.6 which is first version supporting multiple
+         standbys with synchronous replication, where no such keyword <literal>FIRST</>
+         or <literal>ANY</> can be used. Note that the grammer behaves as if <literal>FIRST</>
+         is used, which is incompatible with the post-9.6 version behavior.
+        </para>
+       </note>
        <para>
         The name of a standby server for this purpose is the
         <varname>application_name</> setting of the standby, as set in the
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 5bedaf2..7a0a22a 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1150,7 +1150,7 @@ primary_slot_name = 'node_a_slot'
     An example of <varname>synchronous_standby_names</> for multiple
     synchronous standbys is:
 <programlisting>
-synchronous_standby_names = '2 (s1, s2, s3)'
+synchronous_standby_names = 'FIRST 2 (s1, s2, s3)'
 </programlisting>
     In this example, if four standby servers <literal>s1</>, <literal>s2</>,
     <literal>s3</> and <literal>s4</> are running, the two standbys
@@ -1161,6 +1161,18 @@ synchronous_standby_names = '2 (s1, s2, s3)'
     <literal>s2</> fails. <literal>s4</> is an asynchronous standby since
     its name is not in the list.
    </para>
+   <para>
+    Another example of <varname>synchronous_standby_names</> for multiple
+    synchronous standby is:
+<programlisting>
+ synchronous_standby_names = 'ANY 2 (s1, s2, s3)'
+</programlisting>
+    In this example, if four standby servers <literal>s1</>, <literal>s2</>,
+    <literal>s3</> and <literal>s4</> are running, the three standbys <literal>s1</>,
+    <literal>s2</> and <literal>s3</> will be considered as synchronous standby
+    candidates. The master server will wait for at least 2 replies from them.
+    <literal>s4</> is an asynchronous standby since its name is not in the list.
+   </para>
    </sect3>
 
    <sect3 id="synchronous-replication-performance">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3de489e..2c5f3de 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1389,7 +1389,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
     <row>
      <entry><structfield>sync_state</></entry>
      <entry><type>text</></entry>
-     <entry>Synchronous state of this standby server</entry>
+     <entry>Synchronous state of this standby server. It is <literal>quorum</>
+     when standby is considered as a candidate of quorum commit.</entry>
     </row>
    </tbody>
    </tgroup>
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index c99717e..da8bcf0 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -26,7 +26,7 @@ repl_gram.o: repl_scanner.c
 
 # syncrep_scanner is complied as part of syncrep_gram
 syncrep_gram.o: syncrep_scanner.c
-syncrep_scanner.c: FLEXFLAGS = -CF -p
+syncrep_scanner.c: FLEXFLAGS = -CF -p -i
 syncrep_scanner.c: FLEX_NO_BACKUP=yes
 
 # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ac29f56..bcc1317 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -31,16 +31,19 @@
  *
  * In 9.5 or before only a single standby could be considered as
  * synchronous. In 9.6 we support multiple synchronous standbys.
- * The number of synchronous standbys that transactions must wait for
- * replies from is specified in synchronous_standby_names.
- * This parameter also specifies a list of standby names,
- * which determines the priority of each standby for being chosen as
- * a synchronous standby. The standbys whose names appear earlier
- * in the list are given higher priority and will be considered as
- * synchronous. Other standby servers appearing later in this list
- * represent potential synchronous standbys. If any of the current
- * synchronous standbys disconnects for whatever reason, it will be
- * replaced immediately with the next-highest-priority standby.
+ * In 10.0 we support two synchronization methods, priority and
+ * quorum. The number of synchronous standbys that transactions
+ * must wait for replies from and synchronization method are specified
+ * in synchronous_standby_names. This parameter also specifies a list
+ * of standby names, which determines the priority of each standby for
+ * being chosen as a synchronous standby. In priority method, the standbys
+ * whose names appear earlier in the list are given higher priority
+ * and will be considered as synchronous. Other standby servers appearing
+ * later in this list represent potential synchronous standbys. If any of
+ * the current synchronous standbys disconnects for whatever reason,
+ * it will be replaced immediately with the next-highest-priority standby.
+ * In quorum method, the all standbys appearing in the list are
+ * considered as a candidate for quorum commit.
  *
  * Before the standbys chosen from synchronous_standby_names can
  * become the synchronous standbys they must have caught up with
@@ -73,24 +76,27 @@
 
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
+SyncRepConfigData *SyncRepConfig = NULL;
 
 #define SyncStandbysDefined() \
 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
 
 static bool announce_next_takeover = true;
 
-static SyncRepConfigData *SyncRepConfig = NULL;
 static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
 
 static void SyncRepQueueInsert(int mode);
 static void SyncRepCancelWait(void);
 static int	SyncRepWakeQueue(bool all, int mode);
 
-static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
-						   XLogRecPtr *flushPtr,
-						   XLogRecPtr *applyPtr,
-						   bool *am_sync);
+static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
+								 XLogRecPtr *flushPtr,
+								 XLogRecPtr *applyPtr,
+								 bool *am_sync);
 static int	SyncRepGetStandbyPriority(void);
+static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
+static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int	cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
 static bool SyncRepQueueIsOrderedByLSN(int mode);
@@ -386,7 +392,7 @@ SyncRepReleaseWaiters(void)
 	XLogRecPtr	writePtr;
 	XLogRecPtr	flushPtr;
 	XLogRecPtr	applyPtr;
-	bool		got_oldest;
+	bool		got_recptr;
 	bool		am_sync;
 	int			numwrite = 0;
 	int			numflush = 0;
@@ -413,11 +419,10 @@ SyncRepReleaseWaiters(void)
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
 	/*
-	 * Check whether we are a sync standby or not, and calculate the oldest
-	 * positions among all sync standbys.
+	 * Check whether we are a sync standby or not, and calculate the synced
+	 * positions among all sync standbys using method.
 	 */
-	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
-											&applyPtr, &am_sync);
+	got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
 
 	/*
 	 * If we are managing a sync standby, though we weren't prior to this,
@@ -435,7 +440,7 @@ SyncRepReleaseWaiters(void)
 	 * If the number of sync standbys is less than requested or we aren't
 	 * managing a sync standby then just leave.
 	 */
-	if (!got_oldest || !am_sync)
+	if (!got_recptr || !am_sync)
 	{
 		LWLockRelease(SyncRepLock);
 		announce_next_takeover = !am_sync;
@@ -471,17 +476,50 @@ SyncRepReleaseWaiters(void)
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Return the list of sync standbys using according to synchronous method,
+ * or NIL if no sync standby is connected. The caller must hold SyncRepLock.
+ *
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+List *
+SyncRepGetSyncStandbys(bool	*am_sync)
+{
+	/* Set default result */
+	if (am_sync != NULL)
+		*am_sync = false;
+
+	/* Quick exit if sync replication is not requested */
+	if (SyncRepConfig == NULL)
+		return NIL;
+
+	if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY)
+		return SyncRepGetSyncStandbysPriority(am_sync);
+	else if (SyncRepConfig->sync_method == SYNC_REP_QUORUM)
+		return SyncRepGetSyncStandbysQuorum(am_sync);
+	else
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				"invalid synchronization method is specified \"%d\"",
+				 SyncRepConfig->sync_method));
+}
+
+/*
+ * Calculate the Write, Flush and Apply positions among sync standbys.
  *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
- * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
+ * store the positions into *writePtr, *flushPtr and *applyPtr.
+ *
+ * In priority method, we need the oldest these positions among sync
+ * standbys. In quorum method, we need the newest these positions
+ * specified by SyncRepConfig->num_sync.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
  */
 static bool
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
+SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 						   XLogRecPtr *applyPtr, bool *am_sync)
 {
 	List	   *sync_standbys;
@@ -507,29 +545,74 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 		return false;
 	}
 
-	/*
-	 * Scan through all sync standbys and calculate the oldest Write, Flush
-	 * and Apply positions.
-	 */
-	foreach(cell, sync_standbys)
+	if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY)
 	{
-		WalSnd	   *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-		XLogRecPtr	write;
-		XLogRecPtr	flush;
-		XLogRecPtr	apply;
-
-		SpinLockAcquire(&walsnd->mutex);
-		write = walsnd->write;
-		flush = walsnd->flush;
-		apply = walsnd->apply;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
-			*writePtr = write;
-		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
-			*flushPtr = flush;
-		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
-			*applyPtr = apply;
+		/*
+		 * Scan through all sync standbys and calculate the oldest
+		 * Write, Flush and Apply positions.
+		 */
+		foreach (cell, sync_standbys)
+		{
+			WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+			XLogRecPtr	write;
+			XLogRecPtr	flush;
+			XLogRecPtr	apply;
+
+			SpinLockAcquire(&walsnd->mutex);
+			write = walsnd->write;
+			flush = walsnd->flush;
+			apply = walsnd->apply;
+			SpinLockRelease(&walsnd->mutex);
+
+			if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
+				*writePtr = write;
+			if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
+				*flushPtr = flush;
+			if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
+				*applyPtr = apply;
+		}
+	}
+	else /* SYNC_REP_QUORUM */
+	{
+		XLogRecPtr	*write_array;
+		XLogRecPtr	*flush_array;
+		XLogRecPtr	*apply_array;
+		int len;
+		int i = 0;
+
+		len = list_length(sync_standbys);
+		write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+		flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+		apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+
+		foreach (cell, sync_standbys)
+		{
+			WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+
+			SpinLockAcquire(&walsnd->mutex);
+			write_array[i] = walsnd->write;
+			flush_array[i] = walsnd->flush;
+			apply_array[i] = walsnd->apply;
+			SpinLockRelease(&walsnd->mutex);
+
+			i++;
+		}
+
+		qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
+		qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
+		qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+
+		/*
+		 * Get N-th newest Write, Flush, Apply positions
+		 * specified by SyncRepConfig->num_sync.
+		 */
+		*writePtr = write_array[SyncRepConfig->num_sync - 1];
+		*flushPtr = flush_array[SyncRepConfig->num_sync - 1];
+		*applyPtr = apply_array[SyncRepConfig->num_sync - 1];
+
+		pfree(write_array);
+		pfree(flush_array);
+		pfree(apply_array);
 	}
 
 	list_free(sync_standbys);
@@ -537,17 +620,66 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 }
 
 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
+ * Return the list of sync standbys using quorum method, or
+ * NIL if no sync standby is connected. In quorum method, all standby
+ * priorities are same, that is 1. So this function returns the list of
+ * standbys except for the standbys which are not active, or connected
+ * as async.
  *
- * If there are multiple standbys with the same priority,
+ * On return, *am_sync is set to true if this walsender is connecting to
+ * sync standby. Otherwise it's set to false.
+ */
+static List *
+SyncRepGetSyncStandbysQuorum(bool *am_sync)
+{
+	List	*result = NIL;
+	int i;
+
+	Assert(SyncRepConfig->sync_method == SYNC_REP_QUORUM);
+
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/* Must be active */
+		if (walsnd->pid == 0)
+			continue;
+
+		/* Must be streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Must be synchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Must have a valid flush position */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * Consider this standby as candidate of sync and append
+		 * it to the result.
+		 */
+		result = lappend_int(result, i);
+		if (am_sync != NULL && walsnd == MyWalSnd)
+			*am_sync = true;
+	}
+
+	return result;
+}
+
+/*
+ * Return the list of sync standbys using priority method, or
+ * NIL if no sync standby is connected. In priority method,
+ * if there are multiple standbys with the same priority,
  * the first one found is selected preferentially.
- * The caller must hold SyncRepLock.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+static List *
+SyncRepGetSyncStandbysPriority(bool *am_sync)
 {
 	List	   *result = NIL;
 	List	   *pending = NIL;
@@ -560,13 +692,7 @@ SyncRepGetSyncStandbys(bool *am_sync)
 	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
 								 * rearrangement */
 
-	/* Set default result */
-	if (am_sync != NULL)
-		*am_sync = false;
-
-	/* Quick exit if sync replication is not requested */
-	if (SyncRepConfig == NULL)
-		return NIL;
+	Assert(SyncRepConfig->sync_method == SYNC_REP_PRIORITY);
 
 	lowest_priority = SyncRepConfig->nmembers;
 	next_highest_priority = lowest_priority + 1;
@@ -892,6 +1018,23 @@ SyncRepQueueIsOrderedByLSN(int mode)
 #endif
 
 /*
+ * Compare lsn in order to sort array in descending order.
+ */
+static int
+cmp_lsn(const void *a, const void *b)
+{
+	XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
+	XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
+
+	if (lsn1 > lsn2)
+		return -1;
+	else if (lsn1 == lsn2)
+		return 0;
+	else
+		return 1;
+}
+
+/*
  * ===========================================================
  * Synchronous Replication functions executed by any process
  * ===========================================================
diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y
index 35c2776..e10be8b 100644
--- a/src/backend/replication/syncrep_gram.y
+++ b/src/backend/replication/syncrep_gram.y
@@ -21,7 +21,7 @@ SyncRepConfigData *syncrep_parse_result;
 char	   *syncrep_parse_error_msg;
 
 static SyncRepConfigData *create_syncrep_config(const char *num_sync,
-					  List *members);
+					List *members, int sync_method);
 
 /*
  * Bison doesn't allocate anything that needs to live across parser calls,
@@ -46,7 +46,7 @@ static SyncRepConfigData *create_syncrep_config(const char *num_sync,
 	SyncRepConfigData *config;
 }
 
-%token <str> NAME NUM JUNK
+%token <str> NAME NUM JUNK ANY FIRST
 
 %type <config> result standby_config
 %type <list> standby_list
@@ -60,8 +60,10 @@ result:
 	;
 
 standby_config:
-		standby_list				{ $$ = create_syncrep_config("1", $1); }
-		| NUM '(' standby_list ')'	{ $$ = create_syncrep_config($1, $3); }
+		standby_list						{ $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
+		| NUM '(' standby_list ')'			{ $$ = create_syncrep_config($1, $3, SYNC_REP_QUORUM); }
+		| ANY NUM '(' standby_list ')'		{ $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
+		| FIRST NUM '(' standby_list ')'	{ $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
 	;
 
 standby_list:
@@ -77,7 +79,7 @@ standby_name:
 
 
 static SyncRepConfigData *
-create_syncrep_config(const char *num_sync, List *members)
+create_syncrep_config(const char *num_sync, List *members, int sync_method)
 {
 	SyncRepConfigData *config;
 	int			size;
@@ -98,6 +100,7 @@ create_syncrep_config(const char *num_sync, List *members)
 
 	config->config_size = size;
 	config->num_sync = atoi(num_sync);
+	config->sync_method = sync_method;
 	config->nmembers = list_length(members);
 	ptr = config->member_names;
 	foreach(lc, members)
diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l
index d20662e..403fd7d 100644
--- a/src/backend/replication/syncrep_scanner.l
+++ b/src/backend/replication/syncrep_scanner.l
@@ -54,6 +54,8 @@ digit			[0-9]
 ident_start		[A-Za-z\200-\377_]
 ident_cont		[A-Za-z\200-\377_0-9\$]
 identifier		{ident_start}{ident_cont}*
+any_ident		any
+first_ident		first
 
 dquote			\"
 xdstart			{dquote}
@@ -64,6 +66,14 @@ xdinside		[^"]+
 %%
 {space}+	{ /* ignore */ }
 
+{any_ident}	{
+				yylval.str = pstrdup(yytext);
+				return ANY;
+		}
+{first_ident}	{
+				yylval.str = pstrdup(yytext);
+				return FIRST;
+		}
 {xdstart}	{
 				initStringInfo(&xdbuf);
 				BEGIN(xd);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e508..04fe994 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2860,12 +2860,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 			/*
 			 * More easily understood version of standby state. This is purely
-			 * informational, not different from priority.
+			 * informational. In quorum method, since all standbys are considered as
+			 * a candidate of quorum commit standby state is  always 'quorum'.
 			 */
 			if (priority == 0)
 				values[7] = CStringGetTextDatum("async");
 			else if (list_member_int(sync_standbys, i))
-				values[7] = CStringGetTextDatum("sync");
+				values[7] = SyncRepConfig->sync_method == SYNC_REP_PRIORITY ?
+					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
 				values[7] = CStringGetTextDatum("potential");
 		}
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index e4e0e27..8dd74a3 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -32,6 +32,10 @@
 #define SYNC_REP_WAITING			1
 #define SYNC_REP_WAIT_COMPLETE		2
 
+/* sync_method of SyncRepConfigData */
+#define SYNC_REP_PRIORITY	0
+#define SYNC_REP_QUORUM		1
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -45,10 +49,13 @@ typedef struct SyncRepConfigData
 	int			num_sync;		/* number of sync standbys that we need to
 								 * wait for */
 	int			nmembers;		/* number of members in the following list */
+	int			sync_method;	/* synchronization method */
 	/* member_names contains nmembers consecutive nul-terminated C strings */
 	char		member_names[FLEXIBLE_ARRAY_MEMBER];
 } SyncRepConfigData;
 
+extern SyncRepConfigData *SyncRepConfig;
+
 /* communication variables for parsing synchronous_standby_names GUC */
 extern SyncRepConfigData *syncrep_parse_result;
 extern char *syncrep_parse_error_msg;
diff --git a/src/test/recovery/t/007_sync_rep.pl b/src/test/recovery/t/007_sync_rep.pl
index 0c87226..c502d20 100644
--- a/src/test/recovery/t/007_sync_rep.pl
+++ b/src/test/recovery/t/007_sync_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 8;
+use Test::More tests => 11;
 
 # Query checking sync_priority and sync_state of each standby
 my $check_sql =
@@ -107,7 +107,7 @@ test_sync_state(
 	$node_master, qq(standby2|2|sync
 standby3|3|sync),
 	'2 synchronous standbys',
-	'2(standby1,standby2,standby3)');
+	'FIRST 2(standby1,standby2,standby3)');
 
 # Start standby1
 $node_standby_1->start;
@@ -138,7 +138,7 @@ standby2|4|sync
 standby3|3|sync
 standby4|1|sync),
 	'num_sync exceeds the num of potential sync standbys',
-	'6(standby4,standby0,standby3,standby2)');
+	'FIRST 6(standby4,standby0,standby3,standby2)');
 
 # The setting that * comes before another standby name is acceptable
 # but does not make sense in most cases. Check that sync_state is
@@ -150,7 +150,7 @@ standby2|2|sync
 standby3|2|potential
 standby4|2|potential),
 	'asterisk comes before another standby name',
-	'2(standby1,*,standby2)');
+	'FIRST 2(standby1,*,standby2)');
 
 # Check that the setting of '2(*)' chooses standby2 and standby3 that are stored
 # earlier in WalSnd array as sync standbys.
@@ -160,7 +160,7 @@ standby2|1|sync
 standby3|1|sync
 standby4|1|potential),
 	'multiple standbys having the same priority are chosen as sync',
-	'2(*)');
+	'FIRST 2(*)');
 
 # Stop Standby3 which is considered in 'sync' state.
 $node_standby_3->stop;
@@ -172,3 +172,34 @@ test_sync_state(
 standby2|1|sync
 standby4|1|potential),
 	'potential standby found earlier in array is promoted to sync');
+
+# Check that the state of standbys listed as a voter are having
+# same priority when synchronous_standby_names uses quorum method.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|2|quorum
+standby4|0|async),
+'2 quorum and 1 async',
+'ANY 2(standby1, standby2)');
+
+# Check that state of standbys are not the same as the behaviour of that
+# 'ANY' is specified.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|2|quorum
+standby4|0|async),
+'not specify synchronization method',
+'2(standby1, standby2)');
+
+# Start Standby3 which will be considered in 'quorum' state.
+$node_standby_3->start;
+
+# Check that set setting of 'ANY 2(*)' chooses all standbys as
+# voter.
+test_sync_state(
+$node_master, qq(standby1|1|quorum
+standby2|1|quorum
+standby3|1|quorum
+standby4|1|quorum),
+'all standbys are considered as candidates for quorum commit',
+'ANY 2(*)');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to