On Tue, Jan 11, 2011 at 13:18, Simon Riggs <si...@2ndquadrant.com> wrote: > On Tue, 2011-01-11 at 13:04 +0100, Magnus Hagander wrote: >> On Tue, Jan 11, 2011 at 12:58, Simon Riggs <si...@2ndquadrant.com> wrote: >> > On Tue, 2011-01-11 at 12:41 +0100, Magnus Hagander wrote: >> > >> >> Just to be clear, you're objecting to the *name* of the state, right, >> >> not how/where it's set? >> > >> > Yes >> > >> >> In particular, how the catchup/streaming >> >> things are set? >> > >> > You've set it in the right places. >> > >> > I would personally constrain the state transitions, so that we can't >> > ever make illegal changes, such as CATCHUP -> BACKUP. >> >> Well, once we enter the walsender loop we can never get out of it, so >> it simply cannot happen... > > Accidentally it can, so I would like to protect against that. > > Putting those checks in are like Asserts(), they help document what is > and is not possible by design. > >> > I would also check the state locally before grabbing the spinlock, as is >> > typical in other xlog code. Continually resetting shared state to >> > exactly what it is already seems strange, to me. If we make the rule >> > that the state can only be changed by the WALSender itself, it won't >> > need to grab spinlock. If you don't like that, a local variable works. >> >> Hmm. I don't see why anybody other than the walsender should change >> it, so yeah. You mean I can just drop the spinlock calls completely, >> and then do an >> if (walsnd->state != state) >> walsnd->state = state; >> >> >> ? Do I need to keep the volatile pointer, or should I drop that as well? > > No, do this at top > > if (walsnd->state == state) > return; > > Keep spinlocks when actually setting it.
Aha. Thanks for the pointers, pfa a new version. -- Magnus Hagander Me: http://www.hagander.net/ Work: http://www.redpill-linpro.com/
*** a/doc/src/sgml/monitoring.sgml --- b/doc/src/sgml/monitoring.sgml *************** *** 298,305 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re <entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry> <entry>One row per WAL sender process, showing process <acronym>ID</>, user OID, user name, application name, client's address and port number, ! time at which the server process began execution, and transaction log ! location. </entry> </row> --- 298,305 ---- <entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry> <entry>One row per WAL sender process, showing process <acronym>ID</>, user OID, user name, application name, client's address and port number, ! time at which the server process began execution, current WAL sender ! state and transaction log location. </entry> </row> *** a/src/backend/catalog/system_views.sql --- b/src/backend/catalog/system_views.sql *************** *** 501,506 **** CREATE VIEW pg_stat_replication AS --- 501,507 ---- S.client_addr, S.client_port, S.backend_start, + W.state, W.sent_location FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W *** a/src/backend/replication/basebackup.c --- b/src/backend/replication/basebackup.c *************** *** 24,29 **** --- 24,30 ---- #include "libpq/pqformat.h" #include "nodes/pg_list.h" #include "replication/basebackup.h" + #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "utils/builtins.h" *************** *** 83,88 **** SendBaseBackup(const char *options) --- 84,91 ---- ALLOCSET_DEFAULT_MAXSIZE); old_context = MemoryContextSwitchTo(backup_context); + WalSndSetState(WALSNDSTATE_BACKUP); + if (backup_label == NULL) ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 179,184 **** WalSndHandshake(void) --- 179,185 ---- { int firstchar; + WalSndSetState(WALSNDSTATE_IDLE); set_ps_display("idle", false); /* Wait for a command to arrive */ *************** *** 482,487 **** WalSndLoop(void) --- 483,491 ---- if (!XLogSend(output_message, &caughtup)) break; } + + /* Update our state to indicate if we're behind or not */ + WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP); } /* *************** *** 533,538 **** InitWalSnd(void) --- 537,543 ---- */ walsnd->pid = MyProcPid; MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr)); + walsnd->state = WALSNDSTATE_IDLE; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ OwnLatch((Latch *) &walsnd->latch); *************** *** 960,965 **** WalSndWakeup(void) --- 965,1004 ---- SetLatch(&WalSndCtl->walsnds[i].latch); } + /* Set state for current walsender (only called in walsender) */ + void WalSndSetState(WalSndState state) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + Assert(am_walsender); + + if (walsnd->state == state) + return; + + SpinLockAcquire(&walsnd->mutex); + walsnd->state = state; + SpinLockRelease(&walsnd->mutex); + } + + /* + * Return a string constant representing the state. This is used + * in system views, and should *not* be translated. + */ + static const char * + WalSndGetStateString(WalSndState state) + { + switch (state) + { + case WALSNDSTATE_IDLE: return "IDLE"; + case WALSNDSTATE_BACKUP: return "BACKUP"; + case WALSNDSTATE_CATCHUP: return "CATCHUP"; + case WALSNDSTATE_STREAMING: return "STREAMING"; + } + return "UNKNOWN"; + } + + /* * Returns activity of walsenders, including pids and xlog locations sent to * standby servers. *************** *** 967,973 **** WalSndWakeup(void) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { ! #define PG_STAT_GET_WAL_SENDERS_COLS 2 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; --- 1006,1012 ---- Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { ! #define PG_STAT_GET_WAL_SENDERS_COLS 3 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; *************** *** 1021,1027 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS) memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(walsnd->pid); ! values[1] = CStringGetTextDatum(sent_location); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } --- 1060,1067 ---- memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(walsnd->pid); ! values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state)); ! values[2] = CStringGetTextDatum(sent_location); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** *** 3075,3081 **** DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); ! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); --- 3075,3081 ---- DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); ! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 16,27 **** --- 16,36 ---- #include "storage/latch.h" #include "storage/spin.h" + + typedef enum WalSndState { + WALSNDSTATE_IDLE = 0, + WALSNDSTATE_BACKUP, + WALSNDSTATE_CATCHUP, + WALSNDSTATE_STREAMING + } WalSndState; + /* * Each walsender has a WalSnd struct in shared memory. */ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ + WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ slock_t mutex; /* locks shared variables shown above */ *************** *** 53,58 **** extern void WalSndSignals(void); --- 62,68 ---- extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); extern void WalSndWakeup(void); + extern void WalSndSetState(WalSndState state); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers