On Fri, Jan 13, 2012 at 9:27 PM, Fujii Masao <[email protected]> wrote:
> On Fri, Jan 13, 2012 at 7:30 PM, Simon Riggs <[email protected]> wrote:
>> On Fri, Jan 13, 2012 at 9:15 AM, Simon Riggs <[email protected]> wrote:
>>> On Fri, Jan 13, 2012 at 7:41 AM, Fujii Masao <[email protected]> wrote:
>>>
>>>> Thought? Comments?
>>>
>>> This is almost exactly the same as my patch series
>>> "syncrep_queues.v[1,2].patch" earlier this year. Which I know because
>>> I was updating that patch myself last night for 9.2. I'm about half
>>> way through doing that, since you and I agreed in Ottawa I would do
>>> this. Perhaps it is better if we work together?
>>
>> I think this comment is mostly pointless. We don't have time to work
>> together and there's no real reason to. You know what you're doing, so
>> I'll leave you to do it.
>>
>> Please add the Apply mode.
>
> OK, will do.
Done. Attached is the updated version of the patch.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1559,1567 **** SET ENABLE_SEQSCAN TO OFF;
<para>
Specifies whether transaction commit will wait for WAL records
to be written to disk before the command returns a <quote>success</>
! indication to the client. Valid values are <literal>on</>,
! <literal>local</>, and <literal>off</>. The default, and safe, value
! is <literal>on</>. When <literal>off</>, there can be a delay between
when success is reported to the client and when the transaction is
really guaranteed to be safe against a server crash. (The maximum
delay is three times <xref linkend="guc-wal-writer-delay">.) Unlike
--- 1559,1567 ----
<para>
Specifies whether transaction commit will wait for WAL records
to be written to disk before the command returns a <quote>success</>
! indication to the client. Valid values are <literal>on</>, <literal>write</>,
! <literal>apply</>, <literal>local</>, and <literal>off</>. The default, and safe,
! value is <literal>on</>. When <literal>off</>, there can be a delay between
when success is reported to the client and when the transaction is
really guaranteed to be safe against a server crash. (The maximum
delay is three times <xref linkend="guc-wal-writer-delay">.) Unlike
***************
*** 1579,1589 **** SET ENABLE_SEQSCAN TO OFF;
If <xref linkend="guc-synchronous-standby-names"> is set, this
parameter also controls whether or not transaction commit will wait
for the transaction's WAL records to be flushed to disk and replicated
! to the standby server. The commit wait will last until a reply from
! the current synchronous standby indicates it has written the commit
! record of the transaction to durable storage. If synchronous
replication is in use, it will normally be sensible either to wait
! both for WAL records to reach both the local and remote disks, or
to allow the transaction to commit asynchronously. However, the
special value <literal>local</> is available for transactions that
wish to wait for local flush to disk, but not synchronous replication.
--- 1579,1600 ----
If <xref linkend="guc-synchronous-standby-names"> is set, this
parameter also controls whether or not transaction commit will wait
for the transaction's WAL records to be flushed to disk and replicated
! to the standby server. When <literal>on</>, the commit wait will last
! until a reply from the current synchronous standby indicates it has flushed
! the commit record of the transaction to durable storage. This will
! avoids any data loss unless the database cluster of both primary and
! standby gets corrupted simultaneously. When <literal>write</>,
! the commit wait will last until a reply from the current synchronous
! standby indicates it has received the commit record of the transaction
! to memory. Normally this causes no data loss at the time of failover.
! However, if both primary and standby crash, and the database cluster of
! the primary gets corrupted, recent committed transactions might
! be lost. When <literal>apply</>, the commit will wait until the current
! synchronous standby has replayed the committed changes successfully.
! This guarantees that any transactions are visible on the synchronous
! standby when they are committed. If synchronous
replication is in use, it will normally be sensible either to wait
! for both local flush and replication of WAL records, or
to allow the transaction to commit asynchronously. However, the
special value <literal>local</> is available for transactions that
wish to wait for local flush to disk, but not synchronous replication.
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1011,1016 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 1011,1039 ----
</para>
<para>
+ Setting <varname>synchronous_commit</> to <literal>write</> will
+ cause each commit to wait for confirmation that the standby has received
+ the commit record to memory. This provides lower level of durability than
+ that <literal>on</> does. However, it's practically useful setting because
+ it can decrease the response time for the transaction, and causes
+ no data loss unless both the primary and the standby crashes and
+ the database of the primary gets corrupted at the same time.
+ </para>
+
+ <para>
+ Setting <varname>synchronous_commit</> to <literal>apply</> will
+ cause each commit to wait for confirmation that the standby has flushed
+ the commit record to durable storage and replayed the committed changes
+ successfully. This provides the same level of durability as <literal>on</>
+ does. This guarantees that any transactions are visible on the standby
+ when they are committed. Note that this makes the transaction commit
+ wait longer time for replication than <literal>on</> or <literal>write</>
+ does because the confirmation about the apply position from the standby
+ is sent less frequently. To decrease the wait time, set
+ <varname>max_standby_streaming_delay</> to a low value.
+ </para>
+
+ <para>
Users will stop waiting if a fast shutdown is requested. However, as
when using asynchronous replication, the server will does not fully
shutdown until all outstanding WAL records are transferred to the currently
***************
*** 1064,1077 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
<title>Planning for High Availability</title>
<para>
! Commits made when <varname>synchronous_commit</> is set to <literal>on</>
! will wait until the sync standby responds. The response may never occur
! if the last, or only, standby should crash.
</para>
<para>
The best solution for avoiding data loss is to ensure you don't lose
! your last remaining sync standby. This can be achieved by naming multiple
potential synchronous standbys using <varname>synchronous_standby_names</>.
The first named standby will be used as the synchronous standby. Standbys
listed after this will take over the role of synchronous standby if the
--- 1087,1100 ----
<title>Planning for High Availability</title>
<para>
! Commits made when <varname>synchronous_commit</> is set to <literal>on</>,
! <literal>write</> or <literal>apply</> will wait until the synchronous standby responds.
! The response may never occur if the last, or only, standby should crash.
</para>
<para>
The best solution for avoiding data loss is to ensure you don't lose
! your last remaining synchronous standby. This can be achieved by naming multiple
potential synchronous standbys using <varname>synchronous_standby_names</>.
The first named standby will be used as the synchronous standby. Standbys
listed after this will take over the role of synchronous standby if the
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 20,28 ****
* per-transaction state information.
*
* Replication is either synchronous or not synchronous (async). If it is
! * async, we just fastpath out of here. If it is sync, then in 9.1 we wait
! * for the flush location on the standby before releasing the waiting backend.
! * Further complexity in that interaction is expected in later releases.
*
* The best performing way to manage the waiting backends is to have a
* single ordered queue of waiting backends, so that we can avoid
--- 20,29 ----
* per-transaction state information.
*
* Replication is either synchronous or not synchronous (async). If it is
! * async, we just fastpath out of here. If it is sync, then we wait for
! * the write, flush or apply location on the standby before releasing
! * the waiting backend. Further complexity in that interaction is expected
! * in later releases.
*
* The best performing way to manage the waiting backends is to have a
* single ordered queue of waiting backends, so that we can avoid
***************
*** 67,79 **** char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
! static void SyncRepQueueInsert(void);
static void SyncRepCancelWait(void);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
! static bool SyncRepQueueIsOrderedByLSN(void);
#endif
/*
--- 68,82 ----
static bool announce_next_takeover = true;
! static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
!
! static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int SyncRepGetStandbyPriority(void);
#ifdef USE_ASSERT_CHECKING
! static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
/*
***************
*** 120,126 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
! XLByteLE(XactCommitLSN, WalSndCtl->lsn))
{
LWLockRelease(SyncRepLock);
return;
--- 123,129 ----
* be a low cost check.
*/
if (!WalSndCtl->sync_standbys_defined ||
! XLByteLE(XactCommitLSN, WalSndCtl->lsn[SyncRepWaitMode]))
{
LWLockRelease(SyncRepLock);
return;
***************
*** 132,139 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
*/
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
! SyncRepQueueInsert();
! Assert(SyncRepQueueIsOrderedByLSN());
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
--- 135,142 ----
*/
MyProc->waitLSN = XactCommitLSN;
MyProc->syncRepState = SYNC_REP_WAITING;
! SyncRepQueueInsert(SyncRepWaitMode);
! Assert(SyncRepQueueIsOrderedByLSN(SyncRepWaitMode));
LWLockRelease(SyncRepLock);
/* Alter ps display to show waiting for sync rep. */
***************
*** 267,284 **** SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
}
/*
! * Insert MyProc into SyncRepQueue, maintaining sorted invariant.
*
* Usually we will go at tail of queue, though it's possible that we arrive
* here out of order, so start at tail and work back to insertion point.
*/
static void
! SyncRepQueueInsert(void)
{
PGPROC *proc;
! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
! &(WalSndCtl->SyncRepQueue),
offsetof(PGPROC, syncRepLinks));
while (proc)
--- 270,288 ----
}
/*
! * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
*
* Usually we will go at tail of queue, though it's possible that we arrive
* here out of order, so start at tail and work back to insertion point.
*/
static void
! SyncRepQueueInsert(int mode)
{
PGPROC *proc;
! Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
! &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
***************
*** 290,296 **** SyncRepQueueInsert(void)
if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
break;
! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
--- 294,300 ----
if (XLByteLT(proc->waitLSN, MyProc->waitLSN))
break;
! proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
***************
*** 298,304 **** SyncRepQueueInsert(void)
if (proc)
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
else
! SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue), &(MyProc->syncRepLinks));
}
/*
--- 302,308 ----
if (proc)
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
else
! SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
}
/*
***************
*** 368,374 **** SyncRepReleaseWaiters(void)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
! int numprocs = 0;
int priority = 0;
int i;
--- 372,380 ----
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL;
! int numwrite = 0;
! int numflush = 0;
! int numapply = 0;
int priority = 0;
int i;
***************
*** 419,440 **** SyncRepReleaseWaiters(void)
return;
}
! if (XLByteLT(walsndctl->lsn, MyWalSnd->flush))
{
! /*
! * Set the lsn first so that when we wake backends they will release
! * up to this location.
! */
! walsndctl->lsn = MyWalSnd->flush;
! numprocs = SyncRepWakeQueue(false);
}
LWLockRelease(SyncRepLock);
! elog(DEBUG3, "released %d procs up to %X/%X",
! numprocs,
MyWalSnd->flush.xlogid,
! MyWalSnd->flush.xrecoff);
/*
* If we are managing the highest priority standby, though we weren't
--- 425,463 ----
return;
}
! /*
! * Set the lsn first so that when we wake backends they will release
! * up to this location.
! */
! if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write))
{
! walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
! numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
! }
! if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush))
! {
! walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
! numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
! }
! if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_APPLY], MyWalSnd->apply))
! {
! walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
! numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
}
LWLockRelease(SyncRepLock);
! elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, "
! "%d procs up to apply %X/%X",
! numwrite,
! MyWalSnd->write.xlogid,
! MyWalSnd->write.xrecoff,
! numflush,
MyWalSnd->flush.xlogid,
! MyWalSnd->flush.xrecoff,
! numapply,
! MyWalSnd->apply.xlogid,
! MyWalSnd->apply.xrecoff);
/*
* If we are managing the highest priority standby, though we weren't
***************
*** 507,530 **** SyncRepGetStandbyPriority(void)
}
/*
! * Walk queue from head. Set the state of any backends that need to be woken,
! * remove them from the queue, and then wake them. Pass all = true to wake
! * whole queue; otherwise, just wake up to the walsender's LSN.
*
* Must hold SyncRepLock.
*/
int
! SyncRepWakeQueue(bool all)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
! Assert(SyncRepQueueIsOrderedByLSN());
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! &(WalSndCtl->SyncRepQueue),
offsetof(PGPROC, syncRepLinks));
while (proc)
--- 530,555 ----
}
/*
! * Walk the specified queue from head. Set the state of any backends that
! * need to be woken, remove them from the queue, and then wake them.
! * Pass all = true to wake whole queue; otherwise, just wake up to
! * the walsender's LSN.
*
* Must hold SyncRepLock.
*/
int
! SyncRepWakeQueue(bool all, int mode)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
PGPROC *proc = NULL;
PGPROC *thisproc = NULL;
int numprocs = 0;
! Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
! Assert(SyncRepQueueIsOrderedByLSN(mode));
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
! &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
***************
*** 532,538 **** SyncRepWakeQueue(bool all)
/*
* Assume the queue is ordered by LSN
*/
! if (!all && XLByteLT(walsndctl->lsn, proc->waitLSN))
return numprocs;
/*
--- 557,563 ----
/*
* Assume the queue is ordered by LSN
*/
! if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN))
return numprocs;
/*
***************
*** 540,546 **** SyncRepWakeQueue(bool all)
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
--- 565,571 ----
* thisproc is valid, proc may be NULL after this.
*/
thisproc = proc;
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
***************
*** 588,594 **** SyncRepUpdateSyncStandbysDefined(void)
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
! SyncRepWakeQueue(true);
/*
* Only allow people to join the queue when there are synchronous
--- 613,624 ----
* wants synchronous replication, we'd better wake them up.
*/
if (!sync_standbys_defined)
! {
! int i;
!
! for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
! SyncRepWakeQueue(true, i);
! }
/*
* Only allow people to join the queue when there are synchronous
***************
*** 605,620 **** SyncRepUpdateSyncStandbysDefined(void)
#ifdef USE_ASSERT_CHECKING
static bool
! SyncRepQueueIsOrderedByLSN(void)
{
PGPROC *proc = NULL;
XLogRecPtr lastLSN;
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
! &(WalSndCtl->SyncRepQueue),
offsetof(PGPROC, syncRepLinks));
while (proc)
--- 635,652 ----
#ifdef USE_ASSERT_CHECKING
static bool
! SyncRepQueueIsOrderedByLSN(int mode)
{
PGPROC *proc = NULL;
XLogRecPtr lastLSN;
+ Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
+
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
! &(WalSndCtl->SyncRepQueue[mode]),
offsetof(PGPROC, syncRepLinks));
while (proc)
***************
*** 628,634 **** SyncRepQueueIsOrderedByLSN(void)
lastLSN = proc->waitLSN;
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
--- 660,666 ----
lastLSN = proc->waitLSN;
! proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(proc->syncRepLinks),
offsetof(PGPROC, syncRepLinks));
}
***************
*** 675,677 **** check_synchronous_standby_names(char **newval, void **extra, GucSource source)
--- 707,729 ----
return true;
}
+
+ void
+ assign_synchronous_commit(int newval, void *extra)
+ {
+ switch (newval)
+ {
+ case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+ SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
+ break;
+ case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+ SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
+ break;
+ case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+ SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
+ break;
+ default:
+ SyncRepWaitMode = SYNC_REP_NO_WAIT;
+ break;
+ }
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 1410,1416 **** WalSndShmemInit(void)
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
! SHMQueueInit(&(WalSndCtl->SyncRepQueue));
for (i = 0; i < max_wal_senders; i++)
{
--- 1410,1417 ----
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
! for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
! SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
for (i = 0; i < max_wal_senders; i++)
{
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 370,380 **** static const struct config_enum_entry constraint_exclusion_options[] = {
};
/*
! * Although only "on", "off", and "local" are documented, we
* accept all the likely variants of "on" and "off".
*/
static const struct config_enum_entry synchronous_commit_options[] = {
{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
{"on", SYNCHRONOUS_COMMIT_ON, false},
{"off", SYNCHRONOUS_COMMIT_OFF, false},
{"true", SYNCHRONOUS_COMMIT_ON, true},
--- 370,382 ----
};
/*
! * Although only "on", "off", "write", "apply" and "local" are documented, we
* accept all the likely variants of "on" and "off".
*/
static const struct config_enum_entry synchronous_commit_options[] = {
{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
+ {"write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
+ {"apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
{"on", SYNCHRONOUS_COMMIT_ON, false},
{"off", SYNCHRONOUS_COMMIT_OFF, false},
{"true", SYNCHRONOUS_COMMIT_ON, true},
***************
*** 3164,3170 **** static struct config_enum ConfigureNamesEnum[] =
},
&synchronous_commit,
SYNCHRONOUS_COMMIT_ON, synchronous_commit_options,
! NULL, NULL, NULL
},
{
--- 3166,3172 ----
},
&synchronous_commit,
SYNCHRONOUS_COMMIT_ON, synchronous_commit_options,
! NULL, assign_synchronous_commit, NULL
},
{
*** a/src/include/access/xact.h
--- b/src/include/access/xact.h
***************
*** 55,61 **** typedef enum
{
SYNCHRONOUS_COMMIT_OFF, /* asynchronous commit */
SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */
! SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */
} SyncCommitLevel;
/* Define the default setting for synchonous_commit */
--- 55,63 ----
{
SYNCHRONOUS_COMMIT_OFF, /* asynchronous commit */
SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */
! SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */
! SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */
! SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote apply */
} SyncCommitLevel;
/* Define the default setting for synchonous_commit */
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 15,20 ****
--- 15,31 ----
#include "utils/guc.h"
+ #define SyncRepRequested() \
+ (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
+
+ /* SyncRepWaitMode */
+ #define SYNC_REP_NO_WAIT -1
+ #define SYNC_REP_WAIT_WRITE 0
+ #define SYNC_REP_WAIT_FLUSH 1
+ #define SYNC_REP_WAIT_APPLY 2
+
+ #define NUM_SYNC_REP_WAIT_MODE 3
+
/* syncRepState */
#define SYNC_REP_NOT_WAITING 0
#define SYNC_REP_WAITING 1
***************
*** 37,44 **** extern void SyncRepReleaseWaiters(void);
extern void SyncRepUpdateSyncStandbysDefined(void);
/* called by various procs */
! extern int SyncRepWakeQueue(bool all);
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
#endif /* _SYNCREP_H */
--- 48,56 ----
extern void SyncRepUpdateSyncStandbysDefined(void);
/* called by various procs */
! extern int SyncRepWakeQueue(bool all, int mode);
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+ extern void assign_synchronous_commit(int newval, void *extra);
#endif /* _SYNCREP_H */
*** a/src/include/replication/walsender_private.h
--- b/src/include/replication/walsender_private.h
***************
*** 14,19 ****
--- 14,20 ----
#include "access/xlog.h"
#include "nodes/nodes.h"
+ #include "replication/syncrep.h"
#include "storage/latch.h"
#include "storage/shmem.h"
#include "storage/spin.h"
***************
*** 68,82 **** extern WalSnd *MyWalSnd;
typedef struct
{
/*
! * Synchronous replication queue. Protected by SyncRepLock.
*/
! SHM_QUEUE SyncRepQueue;
/*
* Current location of the head of the queue. All waiters should have a
* waitLSN that follows this value. Protected by SyncRepLock.
*/
! XLogRecPtr lsn;
/*
* Are any sync standbys defined? Waiting backends can't reload the
--- 69,84 ----
typedef struct
{
/*
! * Synchronous replication queue with one queue per request type.
! * Protected by SyncRepLock.
*/
! SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
/*
* Current location of the head of the queue. All waiters should have a
* waitLSN that follows this value. Protected by SyncRepLock.
*/
! XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
/*
* Are any sync standbys defined? Waiting backends can't reload the
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers