On Tue, Jan 11, 2011 at 13:18, Simon Riggs <si...@2ndquadrant.com> wrote:
> On Tue, 2011-01-11 at 13:04 +0100, Magnus Hagander wrote:
>> On Tue, Jan 11, 2011 at 12:58, Simon Riggs <si...@2ndquadrant.com> wrote:
>> > On Tue, 2011-01-11 at 12:41 +0100, Magnus Hagander wrote:
>> >
>> >> Just to be clear, you're objecting to the *name* of the state, right,
>> >> not how/where it's set?
>> >
>> > Yes
>> >
>> >> In particular, how the catchup/streaming
>> >> things are set?
>> >
>> > You've set it in the right places.
>> >
>> > I would personally constrain the state transitions, so that we can't
>> > ever make illegal changes, such as CATCHUP -> BACKUP.
>>
>> Well, once we enter the walsender loop we can never get out of it, so
>> it simply cannot happen...
>
> Accidentally it can, so I would like to protect against that.
>
> Putting those checks in are like Asserts(), they help document what is
> and is not possible by design.
>
>> > I would also check the state locally before grabbing the spinlock, as is
>> > typical in other xlog code. Continually resetting shared state to
>> > exactly what it is already seems strange, to me. If we make the rule
>> > that the state can only be changed by the WALSender itself, it won't
>> > need to grab spinlock. If you don't like that, a local variable works.
>>
>> Hmm. I don't see why anybody other than the walsender should change
>> it, so yeah. You mean I can just drop the spinlock calls completely,
>> and then do an
>> if (walsnd->state != state)
>>    walsnd->state = state;
>>
>>
>> ? Do I need to keep the volatile pointer, or should I drop that as well?
>
> No, do this at top
>
> if (walsnd->state == state)
>  return;
>
> Keep spinlocks when actually setting it.


Aha. Thanks for the pointers, pfa a new version.


-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
*** a/doc/src/sgml/monitoring.sgml
--- b/doc/src/sgml/monitoring.sgml
***************
*** 298,305 **** postgres: <replaceable>user</> <replaceable>database</> <replaceable>host</> <re
        <entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry>
        <entry>One row per WAL sender process, showing process <acronym>ID</>,
        user OID, user name, application name, client's address and port number,
!       time at which the server process began execution, and transaction log
!       location.
       </entry>
       </row>
  
--- 298,305 ----
        <entry><structname>pg_stat_replication</><indexterm><primary>pg_stat_replication</primary></indexterm></entry>
        <entry>One row per WAL sender process, showing process <acronym>ID</>,
        user OID, user name, application name, client's address and port number,
!       time at which the server process began execution, current WAL sender
!       state and transaction log location.
       </entry>
       </row>
  
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 501,506 **** CREATE VIEW pg_stat_replication AS
--- 501,507 ----
              S.client_addr,
              S.client_port,
              S.backend_start,
+             W.state,
              W.sent_location
      FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
              pg_stat_get_wal_senders() AS W
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 24,29 ****
--- 24,30 ----
  #include "libpq/pqformat.h"
  #include "nodes/pg_list.h"
  #include "replication/basebackup.h"
+ #include "replication/walsender.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "utils/builtins.h"
***************
*** 83,88 **** SendBaseBackup(const char *options)
--- 84,91 ----
  										   ALLOCSET_DEFAULT_MAXSIZE);
  	old_context = MemoryContextSwitchTo(backup_context);
  
+ 	WalSndSetState(WALSNDSTATE_BACKUP);
+ 
  	if (backup_label == NULL)
  		ereport(FATAL,
  				(errcode(ERRCODE_PROTOCOL_VIOLATION),
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 179,184 **** WalSndHandshake(void)
--- 179,185 ----
  	{
  		int			firstchar;
  
+ 		WalSndSetState(WALSNDSTATE_IDLE);
  		set_ps_display("idle", false);
  
  		/* Wait for a command to arrive */
***************
*** 482,487 **** WalSndLoop(void)
--- 483,491 ----
  			if (!XLogSend(output_message, &caughtup))
  				break;
  		}
+ 
+ 		/* Update our state to indicate if we're behind or not */
+ 		WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
  	}
  
  	/*
***************
*** 533,538 **** InitWalSnd(void)
--- 537,543 ----
  			 */
  			walsnd->pid = MyProcPid;
  			MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+ 			walsnd->state = WALSNDSTATE_IDLE;
  			SpinLockRelease(&walsnd->mutex);
  			/* don't need the lock anymore */
  			OwnLatch((Latch *) &walsnd->latch);
***************
*** 960,965 **** WalSndWakeup(void)
--- 965,1004 ----
  		SetLatch(&WalSndCtl->walsnds[i].latch);
  }
  
+ /* Set state for current walsender (only called in walsender) */
+ void WalSndSetState(WalSndState state)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile WalSnd *walsnd = MyWalSnd;
+ 
+ 	Assert(am_walsender);
+ 
+ 	if (walsnd->state == state)
+ 		return;
+ 
+ 	SpinLockAcquire(&walsnd->mutex);
+ 	walsnd->state = state;
+ 	SpinLockRelease(&walsnd->mutex);
+ }
+ 
+ /*
+  * Return a string constant representing the state. This is used
+  * in system views, and should *not* be translated.
+  */
+ static const char *
+ WalSndGetStateString(WalSndState state)
+ {
+ 	switch (state)
+ 	{
+ 		case WALSNDSTATE_IDLE: return "IDLE";
+ 		case WALSNDSTATE_BACKUP: return "BACKUP";
+ 		case WALSNDSTATE_CATCHUP: return "CATCHUP";
+ 		case WALSNDSTATE_STREAMING: return "STREAMING";
+ 	}
+ 	return "UNKNOWN";
+ }
+ 
+ 
  /*
   * Returns activity of walsenders, including pids and xlog locations sent to
   * standby servers.
***************
*** 967,973 **** WalSndWakeup(void)
  Datum
  pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  {
! #define PG_STAT_GET_WAL_SENDERS_COLS 	2
  	ReturnSetInfo	   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  	TupleDesc			tupdesc;
  	Tuplestorestate	   *tupstore;
--- 1006,1012 ----
  Datum
  pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  {
! #define PG_STAT_GET_WAL_SENDERS_COLS 	3
  	ReturnSetInfo	   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  	TupleDesc			tupdesc;
  	Tuplestorestate	   *tupstore;
***************
*** 1021,1027 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  
  		memset(nulls, 0, sizeof(nulls));
  		values[0] = Int32GetDatum(walsnd->pid);
! 		values[1] = CStringGetTextDatum(sent_location);
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
--- 1060,1067 ----
  
  		memset(nulls, 0, sizeof(nulls));
  		values[0] = Int32GetDatum(walsnd->pid);
! 		values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state));
! 		values[2] = CStringGetTextDatum(sent_location);
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 3075,3081 **** DATA(insert OID = 1936 (  pg_stat_get_backend_idset		PGNSP PGUID 12 1 100 0 f f
  DESCR("statistics: currently active backend IDs");
  DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active replication");
  DATA(insert OID = 2026 (  pg_backend_pid				PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
  DESCR("statistics: current backend PID");
--- 3075,3081 ----
  DESCR("statistics: currently active backend IDs");
  DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active replication");
  DATA(insert OID = 2026 (  pg_backend_pid				PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
  DESCR("statistics: current backend PID");
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 16,27 ****
--- 16,36 ----
  #include "storage/latch.h"
  #include "storage/spin.h"
  
+ 
+ typedef enum WalSndState {
+ 	WALSNDSTATE_IDLE = 0,
+ 	WALSNDSTATE_BACKUP,
+ 	WALSNDSTATE_CATCHUP,
+ 	WALSNDSTATE_STREAMING
+ } WalSndState;
+ 
  /*
   * Each walsender has a WalSnd struct in shared memory.
   */
  typedef struct WalSnd
  {
  	pid_t		pid;			/* this walsender's process id, or 0 */
+ 	WalSndState	state;			/* this walsender's state */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
  
  	slock_t		mutex;			/* locks shared variables shown above */
***************
*** 53,58 **** extern void WalSndSignals(void);
--- 62,68 ----
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
  extern void WalSndWakeup(void);
+ extern void WalSndSetState(WalSndState state);
  
  extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to