On 13/09/10 14:54, Heikki Linnakangas wrote:
BTW, I noticed that I missed incrementing the latch count in win32_latch.c, and the owning/disowning the latch was done correctly, you get an error if you restart the master and reconnect. I'll post an updated patch shortly.
Here's an updated patch with those bugs fixed. -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ddf7d79..40e1718 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -46,6 +46,7 @@ #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" +#include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/procarray.h" #include "storage/smgr.h" @@ -9139,6 +9140,13 @@ startupproc_quickdie(SIGNAL_ARGS) } +/* SIGUSR1: let latch facility handle the signal */ +static void +StartupProcSigUsr1Handler(SIGNAL_ARGS) +{ + latch_sigusr1_handler(); +} + /* SIGHUP: set flag to re-read config file at next convenient time */ static void StartupProcSigHupHandler(SIGNAL_ARGS) @@ -9213,7 +9221,7 @@ StartupProcessMain(void) else pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR1, StartupProcSigUsr1Handler); pqsignal(SIGUSR2, SIG_IGN); /* @@ -9397,16 +9405,13 @@ retry: } /* - * Data not here yet, so check for trigger then sleep. + * Data not here yet, so check for trigger then sleep for + * five seconds like in the WAL file polling case below. */ if (CheckForStandbyTrigger()) goto triggered; - /* - * When streaming is active, we want to react quickly when - * the next WAL record arrives, so sleep only a bit. - */ - pg_usleep(100000L); /* 100ms */ + WaitForWalArrival(5000000L); } else { diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c index da06202..e39bf1c 100644 --- a/src/backend/port/win32_latch.c +++ b/src/backend/port/win32_latch.c @@ -230,6 +230,8 @@ NumSharedLatches(void) /* Each walsender needs one latch */ numLatches += max_wal_senders; + /* One latch for startup process - walreceiver communication */ + numLatches += 1; return numLatches; } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b868707..996c54f 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -182,6 +182,12 @@ WalReceiverMain(void) Assert(walrcv->pid == 0); switch (walrcv->walRcvState) { + case WALRCV_NOT_INITIALIZED: + /* can't happen */ + SpinLockRelease(&walrcv->mutex); + elog(ERROR, "walreceiver state not initialized"); + break; + case WALRCV_STOPPING: /* If we've already been requested to stop, don't start up. */ walrcv->walRcvState = WALRCV_STOPPED; @@ -529,6 +535,9 @@ XLogWalRcvFlush(void) walrcv->receivedUpto = LogstreamResult.Flush; SpinLockRelease(&walrcv->mutex); + /* Signal the startup process that new WAL has arrived */ + SetLatch(&walrcv->receivedLatch); + /* Report XLOG streaming progress in PS display */ if (update_process_title) { diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index b206885..571fc02 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -62,8 +62,9 @@ WalRcvShmemInit(void) { /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); - WalRcv->walRcvState = WALRCV_STOPPED; + WalRcv->walRcvState = WALRCV_NOT_INITIALIZED; SpinLockInit(&WalRcv->mutex); + InitSharedLatch(&WalRcv->receivedLatch); } } @@ -104,7 +105,7 @@ WalRcvInProgress(void) } } - if (state != WALRCV_STOPPED) + if (state != WALRCV_STOPPED && state != WALRCV_NOT_INITIALIZED) return true; else return false; @@ -128,6 +129,7 @@ ShutdownWalRcv(void) SpinLockAcquire(&walrcv->mutex); switch (walrcv->walRcvState) { + case WALRCV_NOT_INITIALIZED: case WALRCV_STOPPED: break; case WALRCV_STARTING: @@ -177,6 +179,7 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; pg_time_t now = (pg_time_t) time(NULL); + bool firsttime; /* * We always start at the beginning of the segment. That prevents a broken @@ -187,8 +190,20 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) if (recptr.xrecoff % XLogSegSize != 0) recptr.xrecoff -= recptr.xrecoff % XLogSegSize; + /* + * Update shared memory status with information needed by walreceiver + */ SpinLockAcquire(&walrcv->mutex); + /* Check if this is the first time we start walreceiver */ + if (walrcv->walRcvState == WALRCV_NOT_INITIALIZED) + { + firsttime = true; + walrcv->walRcvState = WALRCV_STOPPED; + } + else + firsttime = false; + /* It better be stopped before we try to restart it */ Assert(walrcv->walRcvState == WALRCV_STOPPED); @@ -204,6 +219,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) SpinLockRelease(&walrcv->mutex); + /* + * Take ownership of the latch the first time we start walreceiver, so + * that we can wait for WAL arrival + */ + if (firsttime) + OwnLatch(&walrcv->receivedLatch); + + /* Request postmaster to start the walreceiver process */ SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); } @@ -229,3 +252,20 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) return recptr; } + +/* + * Wait for more WAL to arrive, or timeout (in microseconds) to be reached + */ +void +WaitForWalArrival(int timeout) +{ + /* Wait for more WAL to arrive */ + if (WaitLatch(&WalRcv->receivedLatch, timeout)) + { + /* + * Reset the latch so that next call to WaitForWalArrival will sleep + * again. + */ + ResetLatch(&WalRcv->receivedLatch); + } +} diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 2ea881e..081ea90 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -13,6 +13,7 @@ #define _WALRECEIVER_H #include "access/xlogdefs.h" +#include "storage/latch.h" #include "storage/spin.h" #include "pgtime.h" @@ -30,6 +31,7 @@ extern bool am_walreceiver; */ typedef enum { + WALRCV_NOT_INITIALIZED, /* never started */ WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ @@ -72,6 +74,13 @@ typedef struct char conninfo[MAXCONNINFO]; slock_t mutex; /* locks shared variables shown above */ + + /* + * Walreceiver sets this latch every time new WAL has been received and + * fsync'd to disk, allowing startup process to wait for new WAL to + * arrive. + */ + Latch receivedLatch; } WalRcvData; extern WalRcvData *WalRcv; @@ -92,8 +101,8 @@ extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); -extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); +extern void WaitForWalArrival(int timeout); #endif /* _WALRECEIVER_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers