On Fri, Dec 16, 2016 at 5:04 PM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Fri, Dec 16, 2016 at 2:38 PM, Michael Paquier > <michael.paqu...@gmail.com> wrote: >> On Thu, Dec 15, 2016 at 6:08 PM, Masahiko Sawada <sawada.m...@gmail.com> >> wrote: >>> Attached latest v12 patch. >>> I changed behavior of "N (standby_list)" to use the priority method >>> and incorporated some review comments so far. Please review it. >> >> Some comments... >> >> + Another example of <varname>synchronous_standby_names</> for multiple >> + synchronous standby is: >> Here standby takes an 's'. >> >> + candidates. The master server will wait for at least 2 replies from >> them. >> + <literal>s4</> is an asynchronous standby since its name is not in the >> list. >> + </para> >> "will wait for replies from at least two of them". >> >> + * next-highest-priority standby. In quorum method, the all standbys >> + * appearing in the list are considered as a candidate for quorum commit. >> "the all" is incorrect. I think you mean "all the" instead. >> >> + * NIL if no sync standby is connected. In quorum method, all standby >> + * priorities are same, that is 1. So this function returns the list of >> This is not true. Standys have a priority number assigned. Though it does >> not matter much for quorum groups, it gives an indication of their position >> in the defined list. >> >> #synchronous_standby_names = '' # standby servers that provide sync rep >> - # number of sync standbys and comma-separated list of >> application_name >> + # synchronization method, number of sync standbys >> + # and comma-separated list of application_name >> # from standby(s); '*' = all >> The formulation is funny here: "sync rep synchronization method". >> >> I think that Fujii-san has also some doc changes in his box. For anybody >> picking up this patch next, it would be good to incorporate the things >> I have noticed here. > > Yes, I will. Thanks!
Attached is the modified version of the patch. Barring objections, I will commit this version. Even after committing the patch, there will be still many source comments and documentations that we need to update, for example, in high-availability.sgml. We need to check and update them throughly later. Regards, -- Fujii Masao
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 3054,3094 **** include_dir 'conf.d' transactions waiting for commit will be allowed to proceed after these standby servers confirm receipt of their data. The synchronous standbys will be those whose names appear ! earlier in this list, and that are both currently connected and streaming data in real-time (as shown by a state of <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). ! Other standby servers appearing later in this list represent potential ! synchronous standbys. If any of the current synchronous ! standbys disconnects for whatever reason, ! it will be replaced immediately with the next-highest-priority standby. ! Specifying more than one standby name can allow very high availability. </para> <para> This parameter specifies a list of standby servers using either of the following syntaxes: <synopsis> ! <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] ) <replaceable class="parameter">standby_name</replaceable> [, ...] </synopsis> where <replaceable class="parameter">num_sync</replaceable> is the number of synchronous standbys that transactions need to wait for replies from, and <replaceable class="parameter">standby_name</replaceable> ! is the name of a standby server. For example, a setting of ! <literal>3 (s1, s2, s3, s4)</> makes transaction commits wait ! until their WAL records are received by three higher-priority standbys ! chosen from standby servers <literal>s1</>, <literal>s2</>, ! <literal>s3</> and <literal>s4</>. ! </para> ! <para> ! The second syntax was used before <productname>PostgreSQL</> version 9.6 and is still supported. It's the same as the first syntax ! with <replaceable class="parameter">num_sync</replaceable> equal to 1. ! For example, <literal>1 (s1, s2)</> and ! <literal>s1, s2</> have the same meaning: either <literal>s1</> ! or <literal>s2</> is chosen as a synchronous standby. </para> <para> The name of a standby server for this purpose is the --- 3054,3124 ---- transactions waiting for commit will be allowed to proceed after these standby servers confirm receipt of their data. The synchronous standbys will be those whose names appear ! in this list, and that are both currently connected and streaming data in real-time (as shown by a state of <literal>streaming</literal> in the <link linkend="monitoring-stats-views-table"> <literal>pg_stat_replication</></link> view). ! Specifying more than one standby names can allow very high availability. </para> <para> This parameter specifies a list of standby servers using either of the following syntaxes: <synopsis> ! [FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] ) ! ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] ) <replaceable class="parameter">standby_name</replaceable> [, ...] </synopsis> where <replaceable class="parameter">num_sync</replaceable> is the number of synchronous standbys that transactions need to wait for replies from, and <replaceable class="parameter">standby_name</replaceable> ! is the name of a standby server. ! <literal>FIRST</> and <literal>ANY</> specify the method to choose ! synchronous standbys from the listed servers. ! </para> ! <para> ! The keyword <literal>FIRST</>, coupled with ! <replaceable class="parameter">num_sync</replaceable>, specifies a ! priority-based synchronous replication and makes transaction commits ! wait until their WAL records are replicated to ! <replaceable class="parameter">num_sync</replaceable> synchronous ! standbys chosen based on their priorities. For example, a setting of ! <literal>FIRST 3 (s1, s2, s3, s4)</> will cause each commit to wait for ! replies from three higher-priority standbys chosen from standby servers ! <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</>. ! The standbys whose names appear earlier in the list are given higher ! priority and will be considered as synchronous. Other standby servers ! appearing later in this list represent potential synchronous standbys. ! If any of the current synchronous standbys disconnects for whatever ! reason, it will be replaced immediately with the next-highest-priority ! standby. The keyword <literal>FIRST</> is optional. ! </para> ! <para> ! The keyword <literal>ANY</>, coupled with ! <replaceable class="parameter">num_sync</replaceable>, specifies a ! quorum-based synchronous replication and makes transaction commits ! wait until their WAL records are replicated to <emphasis>at least</> ! <replaceable class="parameter">num_sync</replaceable> listed standbys. ! For example, a setting of <literal>ANY 3 (s1, s2, s3, s4)</> will cause ! each commit to proceed as soon as at least any three standbys of ! <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</> ! reply. ! </para> ! <para> ! <literal>FIRST</> and <literal>ANY</> are case-insensitive. If these ! keywords are used as the name of a standby server, ! its <replaceable class="parameter">standby_name</replaceable> must ! be double-quoted. ! </para> ! <para> ! The third syntax was used before <productname>PostgreSQL</> version 9.6 and is still supported. It's the same as the first syntax ! with <literal>FIRST</> and ! <replaceable class="parameter">num_sync</replaceable> equal to 1. ! For example, <literal>FIRST 1 (s1, s2)</> and <literal>s1, s2</> have ! the same meaning: either <literal>s1</> or <literal>s2</> is chosen ! as a synchronous standby. </para> <para> The name of a standby server for this purpose is the *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 1138,1156 **** primary_slot_name = 'node_a_slot' as synchronous confirm receipt of their data. The number of synchronous standbys that transactions must wait for replies from is specified in <varname>synchronous_standby_names</>. This parameter also specifies ! a list of standby names, which determines the priority of each standby ! for being chosen as a synchronous standby. The standbys whose names ! appear earlier in the list are given higher priority and will be considered ! as synchronous. Other standby servers appearing later in this list ! represent potential synchronous standbys. If any of the current ! synchronous standbys disconnects for whatever reason, it will be replaced ! immediately with the next-highest-priority standby. </para> <para> ! An example of <varname>synchronous_standby_names</> for multiple ! synchronous standbys is: <programlisting> ! synchronous_standby_names = '2 (s1, s2, s3)' </programlisting> In this example, if four standby servers <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</> are running, the two standbys --- 1138,1162 ---- as synchronous confirm receipt of their data. The number of synchronous standbys that transactions must wait for replies from is specified in <varname>synchronous_standby_names</>. This parameter also specifies ! a list of standby names and the method (<literal>FIRST</> and ! <literal>ANY</>) to choose synchronous standbys from the listed ones. </para> <para> ! The method <literal>FIRST</> specifies a priority-based synchronous ! replication and makes transaction commits wait until their WAL records are ! replicated to the requested number of synchronous standbys chosen based on ! their priorities. The standbys whose names appear earlier in the list are ! given higher priority and will be considered as synchronous. Other standby ! servers appearing later in this list represent potential synchronous ! standbys. If any of the current synchronous standbys disconnects for ! whatever reason, it will be replaced immediately with the ! next-highest-priority standby. ! </para> ! <para> ! An example of <varname>synchronous_standby_names</> for ! a priority-based multiple synchronous standbys is: <programlisting> ! synchronous_standby_names = 'FIRST 2 (s1, s2, s3)' </programlisting> In this example, if four standby servers <literal>s1</>, <literal>s2</>, <literal>s3</> and <literal>s4</> are running, the two standbys *************** *** 1162,1167 **** synchronous_standby_names = '2 (s1, s2, s3)' --- 1168,1191 ---- its name is not in the list. </para> <para> + The method <literal>ANY</> specifies a quorum-based synchronous + replication and makes transaction commits wait until their WAL records + are replicated to <emphasis>at least</> the requested number of + synchronous standbys in the list. + </para> + <para> + An example of <varname>synchronous_standby_names</> for + a quorum-based multiple synchronous standbys is: + <programlisting> + synchronous_standby_names = 'ANY 2 (s1, s2, s3)' + </programlisting> + In this example, if four standby servers <literal>s1</>, <literal>s2</>, + <literal>s3</> and <literal>s4</> are running, transaction commits will + wait for replies from at least any two standbys of <literal>s1</>, + <literal>s2</> and <literal>s3</>. <literal>s4</> is an asynchronous + standby since its name is not in the list. + </para> + <para> The synchronous states of standby servers can be viewed using the <structname>pg_stat_replication</structname> view. </para> *** a/doc/src/sgml/monitoring.sgml --- b/doc/src/sgml/monitoring.sgml *************** *** 1412,1418 **** SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i <entry><structfield>sync_priority</></entry> <entry><type>integer</></entry> <entry>Priority of this standby server for being chosen as the ! synchronous standby</entry> </row> <row> <entry><structfield>sync_state</></entry> --- 1412,1419 ---- <entry><structfield>sync_priority</></entry> <entry><type>integer</></entry> <entry>Priority of this standby server for being chosen as the ! synchronous standby in a priority-based synchronous replication. ! This has no effect in a quorum-based synchronous replication.</entry> </row> <row> <entry><structfield>sync_state</></entry> *************** *** 1437,1442 **** SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i --- 1438,1449 ---- <literal>sync</>: This standby server is synchronous. </para> </listitem> + <listitem> + <para> + <literal>quorum</>: This standby server is considered as a candidate + for quorum standbys. + </para> + </listitem> </itemizedlist> </entry> </row> *** a/src/backend/replication/Makefile --- b/src/backend/replication/Makefile *************** *** 26,32 **** repl_gram.o: repl_scanner.c # syncrep_scanner is complied as part of syncrep_gram syncrep_gram.o: syncrep_scanner.c ! syncrep_scanner.c: FLEXFLAGS = -CF -p syncrep_scanner.c: FLEX_NO_BACKUP=yes # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c --- 26,32 ---- # syncrep_scanner is complied as part of syncrep_gram syncrep_gram.o: syncrep_scanner.c ! syncrep_scanner.c: FLEXFLAGS = -CF -p -i syncrep_scanner.c: FLEX_NO_BACKUP=yes # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 30,52 **** * searching the through all waiters each time we receive a reply. * * In 9.5 or before only a single standby could be considered as ! * synchronous. In 9.6 we support multiple synchronous standbys. ! * The number of synchronous standbys that transactions must wait for ! * replies from is specified in synchronous_standby_names. ! * This parameter also specifies a list of standby names, ! * which determines the priority of each standby for being chosen as ! * a synchronous standby. The standbys whose names appear earlier ! * in the list are given higher priority and will be considered as ! * synchronous. Other standby servers appearing later in this list ! * represent potential synchronous standbys. If any of the current ! * synchronous standbys disconnects for whatever reason, it will be ! * replaced immediately with the next-highest-priority standby. * * Before the standbys chosen from synchronous_standby_names can * become the synchronous standbys they must have caught up with * the primary; that may take some time. Once caught up, ! * the current higher priority standbys which are considered as ! * synchronous at that moment will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group * --- 30,63 ---- * searching the through all waiters each time we receive a reply. * * In 9.5 or before only a single standby could be considered as ! * synchronous. In 9.6 we support a priority-based multiple synchronous ! * standbys. In 10.0 a quorum-based multiple synchronous standbys is also ! * supported. The number of synchronous standbys that transactions ! * must wait for replies from is specified in synchronous_standby_names. ! * This parameter also specifies a list of standby names and the method ! * (FIRST and ANY) to choose synchronous standbys from the listed ones. ! * ! * The method FIRST specifies a priority-based synchronous replication ! * and makes transaction commits wait until their WAL records are ! * replicated to the requested number of synchronous standbys chosen based ! * on their priorities. The standbys whose names appear earlier in the list ! * are given higher priority and will be considered as synchronous. ! * Other standby servers appearing later in this list represent potential ! * synchronous standbys. If any of the current synchronous standbys ! * disconnects for whatever reason, it will be replaced immediately with ! * the next-highest-priority standby. ! * ! * The method ANY specifies a quorum-based synchronous replication ! * and makes transaction commits wait until their WAL records are ! * replicated to at least the requested number of synchronous standbys ! * in the list. All the standbys appearing in the list are considered as ! * candidates for quorum synchronous standbys. * * Before the standbys chosen from synchronous_standby_names can * become the synchronous standbys they must have caught up with * the primary; that may take some time. Once caught up, ! * the standbys which are considered as synchronous at that moment ! * will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group * *************** *** 79,96 **** char *SyncRepStandbyNames; static bool announce_next_takeover = true; ! static SyncRepConfigData *SyncRepConfig = NULL; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); ! static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, ! XLogRecPtr *flushPtr, ! XLogRecPtr *applyPtr, ! bool *am_sync); static int SyncRepGetStandbyPriority(void); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); --- 90,118 ---- static bool announce_next_takeover = true; ! SyncRepConfigData *SyncRepConfig = NULL; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); static int SyncRepWakeQueue(bool all, int mode); ! static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, ! XLogRecPtr *flushPtr, ! XLogRecPtr *applyPtr, ! bool *am_sync); ! static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, ! XLogRecPtr *flushPtr, ! XLogRecPtr *applyPtr, ! List *sync_standbys); ! static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, ! XLogRecPtr *flushPtr, ! XLogRecPtr *applyPtr, ! List *sync_standbys, uint8 nth); static int SyncRepGetStandbyPriority(void); + static List *SyncRepGetSyncStandbysPriority(bool *am_sync); + static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); + static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); *************** *** 386,392 **** SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; ! bool got_oldest; bool am_sync; int numwrite = 0; int numflush = 0; --- 408,414 ---- XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; ! bool got_recptr; bool am_sync; int numwrite = 0; int numflush = 0; *************** *** 413,423 **** SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* ! * Check whether we are a sync standby or not, and calculate the oldest * positions among all sync standbys. */ ! got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, ! &applyPtr, &am_sync); /* * If we are managing a sync standby, though we weren't prior to this, --- 435,444 ---- LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* ! * Check whether we are a sync standby or not, and calculate the synced * positions among all sync standbys. */ ! got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); /* * If we are managing a sync standby, though we weren't prior to this, *************** *** 426,441 **** SyncRepReleaseWaiters(void) if (announce_next_takeover && am_sync) { announce_next_takeover = false; ! ereport(LOG, ! (errmsg("standby \"%s\" is now a synchronous standby with priority %u", ! application_name, MyWalSnd->sync_standby_priority))); } /* * If the number of sync standbys is less than requested or we aren't * managing a sync standby then just leave. */ ! if (!got_oldest || !am_sync) { LWLockRelease(SyncRepLock); announce_next_takeover = !am_sync; --- 447,468 ---- if (announce_next_takeover && am_sync) { announce_next_takeover = false; ! ! if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ! ereport(LOG, ! (errmsg("standby \"%s\" is now a synchronous standby with priority %u", ! application_name, MyWalSnd->sync_standby_priority))); ! else ! ereport(LOG, ! (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", ! application_name))); } /* * If the number of sync standbys is less than requested or we aren't * managing a sync standby then just leave. */ ! if (!got_recptr || !am_sync) { LWLockRelease(SyncRepLock); announce_next_takeover = !am_sync; *************** *** 471,491 **** SyncRepReleaseWaiters(void) } /* ! * Calculate the oldest Write, Flush and Apply positions among sync standbys. * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and ! * store the oldest positions into *writePtr, *flushPtr and *applyPtr. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ static bool ! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { List *sync_standbys; - ListCell *cell; *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; --- 498,517 ---- } /* ! * Calculate the synced Write, Flush and Apply positions among sync standbys. * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and ! * store the positions into *writePtr, *flushPtr and *applyPtr. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ static bool ! SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { List *sync_standbys; *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *************** *** 508,519 **** SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, } /* ! * Scan through all sync standbys and calculate the oldest Write, Flush ! * and Apply positions. */ ! foreach(cell, sync_standbys) { ! WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; --- 534,582 ---- } /* ! * In a priority-based sync replication, the synced positions are the ! * oldest ones among sync standbys. In a quorum-based, they are the Nth ! * latest ones. ! * ! * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest positions. ! * But we use SyncRepGetOldestSyncRecPtr() for that calculation because ! * it's a bit more efficient. ! * ! * XXX If the numbers of current and requested sync standbys are the same, ! * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced ! * positions even in a quorum-based sync replication. ! */ ! if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ! { ! SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, ! sync_standbys); ! } ! else ! { ! SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, ! sync_standbys, SyncRepConfig->num_sync); ! } ! ! list_free(sync_standbys); ! return true; ! } ! ! /* ! * Calculate the oldest Write, Flush and Apply positions among sync standbys. ! */ ! static void ! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, ! XLogRecPtr *applyPtr, List *sync_standbys) ! { ! ListCell *cell; ! ! /* ! * Scan through all sync standbys and calculate the oldest ! * Write, Flush and Apply positions. */ ! foreach (cell, sync_standbys) { ! WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; *************** *** 531,553 **** SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) *applyPtr = apply; } ! list_free(sync_standbys); ! return true; } /* * Return the list of sync standbys, or NIL if no sync standby is connected. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. * The caller must hold SyncRepLock. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ List * ! SyncRepGetSyncStandbys(bool *am_sync) { List *result = NIL; List *pending = NIL; --- 594,756 ---- if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) *applyPtr = apply; } + } ! /* ! * Calculate the Nth latest Write, Flush and Apply positions among sync ! * standbys. ! */ ! static void ! SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, ! XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) ! { ! ListCell *cell; ! XLogRecPtr *write_array; ! XLogRecPtr *flush_array; ! XLogRecPtr *apply_array; ! int len; ! int i = 0; ! ! len = list_length(sync_standbys); ! write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); ! flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); ! apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); ! ! foreach (cell, sync_standbys) ! { ! WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; ! ! SpinLockAcquire(&walsnd->mutex); ! write_array[i] = walsnd->write; ! flush_array[i] = walsnd->flush; ! apply_array[i] = walsnd->apply; ! SpinLockRelease(&walsnd->mutex); ! ! i++; ! } ! ! qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); ! qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); ! qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); ! ! /* Get Nth latest Write, Flush, Apply positions */ ! *writePtr = write_array[nth - 1]; ! *flushPtr = flush_array[nth - 1]; ! *applyPtr = apply_array[nth - 1]; ! ! pfree(write_array); ! pfree(flush_array); ! pfree(apply_array); ! } ! ! /* ! * Compare lsn in order to sort array in descending order. ! */ ! static int ! cmp_lsn(const void *a, const void *b) ! { ! XLogRecPtr lsn1 = *((const XLogRecPtr *) a); ! XLogRecPtr lsn2 = *((const XLogRecPtr *) b); ! ! if (lsn1 > lsn2) ! return -1; ! else if (lsn1 == lsn2) ! return 0; ! else ! return 1; } /* * Return the list of sync standbys, or NIL if no sync standby is connected. * * The caller must hold SyncRepLock. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ List * ! SyncRepGetSyncStandbys(bool *am_sync) ! { ! /* Set default result */ ! if (am_sync != NULL) ! *am_sync = false; ! ! /* Quick exit if sync replication is not requested */ ! if (SyncRepConfig == NULL) ! return NIL; ! ! return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? ! SyncRepGetSyncStandbysPriority(am_sync) : ! SyncRepGetSyncStandbysQuorum(am_sync); ! } ! ! /* ! * Return the list of all the candidates for quorum sync standbys, ! * or NIL if no such standby is connected. ! * ! * The caller must hold SyncRepLock. This function must be called only in ! * a quorum-based sync replication. ! * ! * On return, *am_sync is set to true if this walsender is connecting to ! * sync standby. Otherwise it's set to false. ! */ ! static List * ! SyncRepGetSyncStandbysQuorum(bool *am_sync) ! { ! List *result = NIL; ! int i; ! volatile WalSnd *walsnd; /* Use volatile pointer to prevent code ! * rearrangement */ ! ! Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); ! ! for (i = 0; i < max_wal_senders; i++) ! { ! walsnd = &WalSndCtl->walsnds[i]; ! ! /* Must be active */ ! if (walsnd->pid == 0) ! continue; ! ! /* Must be streaming */ ! if (walsnd->state != WALSNDSTATE_STREAMING) ! continue; ! ! /* Must be synchronous */ ! if (walsnd->sync_standby_priority == 0) ! continue; ! ! /* Must have a valid flush position */ ! if (XLogRecPtrIsInvalid(walsnd->flush)) ! continue; ! ! /* ! * Consider this standby as a candidate for quorum sync standbys ! * and append it to the result. ! */ ! result = lappend_int(result, i); ! if (am_sync != NULL && walsnd == MyWalSnd) ! *am_sync = true; ! } ! ! return result; ! } ! ! /* ! * Return the list of sync standbys chosen based on their priorities, ! * or NIL if no sync standby is connected. ! * ! * If there are multiple standbys with the same priority, ! * the first one found is selected preferentially. ! * ! * The caller must hold SyncRepLock. This function must be called only in ! * a priority-based sync replication. ! * ! * On return, *am_sync is set to true if this walsender is connecting to ! * sync standby. Otherwise it's set to false. ! */ ! static List * ! SyncRepGetSyncStandbysPriority(bool *am_sync) { List *result = NIL; List *pending = NIL; *************** *** 560,572 **** SyncRepGetSyncStandbys(bool *am_sync) volatile WalSnd *walsnd; /* Use volatile pointer to prevent code * rearrangement */ ! /* Set default result */ ! if (am_sync != NULL) ! *am_sync = false; ! ! /* Quick exit if sync replication is not requested */ ! if (SyncRepConfig == NULL) ! return NIL; lowest_priority = SyncRepConfig->nmembers; next_highest_priority = lowest_priority + 1; --- 763,769 ---- volatile WalSnd *walsnd; /* Use volatile pointer to prevent code * rearrangement */ ! Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); lowest_priority = SyncRepConfig->nmembers; next_highest_priority = lowest_priority + 1; *** a/src/backend/replication/syncrep_gram.y --- b/src/backend/replication/syncrep_gram.y *************** *** 21,27 **** SyncRepConfigData *syncrep_parse_result; char *syncrep_parse_error_msg; static SyncRepConfigData *create_syncrep_config(const char *num_sync, ! List *members); /* * Bison doesn't allocate anything that needs to live across parser calls, --- 21,27 ---- char *syncrep_parse_error_msg; static SyncRepConfigData *create_syncrep_config(const char *num_sync, ! List *members, uint8 syncrep_method); /* * Bison doesn't allocate anything that needs to live across parser calls, *************** *** 46,52 **** static SyncRepConfigData *create_syncrep_config(const char *num_sync, SyncRepConfigData *config; } ! %token <str> NAME NUM JUNK %type <config> result standby_config %type <list> standby_list --- 46,52 ---- SyncRepConfigData *config; } ! %token <str> NAME NUM JUNK ANY FIRST %type <config> result standby_config %type <list> standby_list *************** *** 60,67 **** result: ; standby_config: ! standby_list { $$ = create_syncrep_config("1", $1); } ! | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); } ; standby_list: --- 60,69 ---- ; standby_config: ! standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); } ! | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); } ! | ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); } ! | FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); } ; standby_list: *************** *** 75,83 **** standby_name: ; %% - static SyncRepConfigData * ! create_syncrep_config(const char *num_sync, List *members) { SyncRepConfigData *config; int size; --- 77,84 ---- ; %% static SyncRepConfigData * ! create_syncrep_config(const char *num_sync, List *members, uint8 syncrep_method) { SyncRepConfigData *config; int size; *************** *** 98,103 **** create_syncrep_config(const char *num_sync, List *members) --- 99,105 ---- config->config_size = size; config->num_sync = atoi(num_sync); + config->syncrep_method = syncrep_method; config->nmembers = list_length(members); ptr = config->member_names; foreach(lc, members) *** a/src/backend/replication/syncrep_scanner.l --- b/src/backend/replication/syncrep_scanner.l *************** *** 64,69 **** xdinside [^"]+ --- 64,72 ---- %% {space}+ { /* ignore */ } + ANY { return ANY; } + FIRST { return FIRST; } + {xdstart} { initStringInfo(&xdbuf); BEGIN(xd); *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 2868,2879 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* * More easily understood version of standby state. This is purely ! * informational, not different from priority. */ if (priority == 0) values[7] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) ! values[7] = CStringGetTextDatum("sync"); else values[7] = CStringGetTextDatum("potential"); } --- 2868,2887 ---- /* * More easily understood version of standby state. This is purely ! * informational. ! * ! * In quorum-based sync replication, the role of each standby ! * listed in synchronous_standby_names can be changing very ! * frequently. Any standbys considered as "sync" at one moment can ! * be switched to "potential" ones at the next moment. So, it's ! * basically useless to report "sync" or "potential" as their sync ! * states. We report just "quorum" for them. */ if (priority == 0) values[7] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) ! values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? ! CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[7] = CStringGetTextDatum("potential"); } *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 245,251 **** # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep ! # number of sync standbys and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed --- 245,252 ---- # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep ! # method to choose sync standbys, number of sync standbys ! # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed *** a/src/include/replication/syncrep.h --- b/src/include/replication/syncrep.h *************** *** 32,37 **** --- 32,41 ---- #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 + /* syncrep_method of SyncRepConfigData */ + #define SYNC_REP_PRIORITY 0 + #define SYNC_REP_QUORUM 1 + /* * Struct for the configuration of synchronous replication. * *************** *** 44,54 **** typedef struct SyncRepConfigData --- 48,61 ---- int config_size; /* total size of this struct, in bytes */ int num_sync; /* number of sync standbys that we need to * wait for */ + uint8 syncrep_method; /* method to choose sync standbys */ int nmembers; /* number of members in the following list */ /* member_names contains nmembers consecutive nul-terminated C strings */ char member_names[FLEXIBLE_ARRAY_MEMBER]; } SyncRepConfigData; + extern SyncRepConfigData *SyncRepConfig; + /* communication variables for parsing synchronous_standby_names GUC */ extern SyncRepConfigData *syncrep_parse_result; extern char *syncrep_parse_error_msg; *** a/src/test/recovery/t/007_sync_rep.pl --- b/src/test/recovery/t/007_sync_rep.pl *************** *** 3,9 **** use strict; use warnings; use PostgresNode; use TestLib; ! use Test::More tests => 8; # Query checking sync_priority and sync_state of each standby my $check_sql = --- 3,9 ---- use warnings; use PostgresNode; use TestLib; ! use Test::More tests => 11; # Query checking sync_priority and sync_state of each standby my $check_sql = *************** *** 172,174 **** test_sync_state( --- 172,205 ---- standby2|1|sync standby4|1|potential), 'potential standby found earlier in array is promoted to sync'); + + # Check that standby1 and standby2 are chosen as sync standbys + # based on their priorities. + test_sync_state( + $node_master, qq(standby1|1|sync + standby2|2|sync + standby4|0|async), + 'priority-based sync replication specified by FIRST keyword', + 'FIRST 2(standby1, standby2)'); + + # Check that all the listed standbys are considered as candidates + # for sync standbys in a quorum-based sync replication. + test_sync_state( + $node_master, qq(standby1|1|quorum + standby2|2|quorum + standby4|0|async), + '2 quorum and 1 async', + 'ANY 2(standby1, standby2)'); + + # Start Standby3 which will be considered in 'quorum' state. + $node_standby_3->start; + + # Check that the setting of 'ANY 2(*)' chooses all standbys as + # candidates for quorum sync standbys. + test_sync_state( + $node_master, qq(standby1|1|quorum + standby2|1|quorum + standby3|1|quorum + standby4|1|quorum), + 'all standbys are considered as candidates for quorum sync standbys', + 'ANY 2(*)');
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers