Include here a prototype patch that implements pg_switch_xlog() in line
with earlier discussions about how this should be implemented.

This patch implements
- separate function for manual xlog switch
- internals to allow pg_stop_backup() to perform auto log switching

Patch applies cleanly to cvstip, passes make check and also switches
xlogs cleanly. I've not tested recovery yet, though will do so shortly
--- yes, of course that is critical, before you say so.

I'm shipping this very early to allow discussion, rather than test it
fully and leave people wondering what it looks like.

A production version will be ready prior to 8.2 code freeze.

-- 
  Simon Riggs             
  EnterpriseDB   http://www.enterprisedb.com
Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.244
diff -c -r1.244 xlog.c
*** src/backend/access/transam/xlog.c	14 Jul 2006 14:52:17 -0000	1.244
--- src/backend/access/transam/xlog.c	27 Jul 2006 12:44:23 -0000
***************
*** 452,458 ****
  static TimeLineID lastPageTLI = 0;
  
  static bool InRedo = false;
! 
  
  static void XLogArchiveNotify(const char *xlog);
  static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
--- 452,458 ----
  static TimeLineID lastPageTLI = 0;
  
  static bool InRedo = false;
! static bool haveSwitchedXLogFile = false;
  
  static void XLogArchiveNotify(const char *xlog);
  static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
***************
*** 465,471 ****
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(void);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
  static int XLogFileInit(uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock);
--- 465,471 ----
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(bool SwitchXLog);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
  static int XLogFileInit(uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock);
***************
*** 495,500 ****
--- 495,502 ----
  static char *str_time(time_t tnow);
  static void issue_xlog_fsync(void);
  
+ static XLogRecPtr RequestXLogSwitch(void);
+ 
  #ifdef WAL_DEBUG
  static void xlog_outrec(StringInfo buf, XLogRecord *record);
  #endif
***************
*** 854,860 ****
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		updrqst = AdvanceXLInsertBuffer();
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
--- 856,862 ----
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		updrqst = AdvanceXLInsertBuffer(false);
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
***************
*** 937,943 ****
  		}
  
  		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer();
  		curridx = Insert->curridx;
  		/* Insert cont-record header */
  		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
--- 939,945 ----
  		}
  
  		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer(false);
  		curridx = Insert->curridx;
  		/* Insert cont-record header */
  		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
***************
*** 947,970 ****
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
! 	/* Ensure next record will be properly aligned */
! 	Insert->currpos = (char *) Insert->currpage +
! 		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
! 	freespace = INSERT_FREESPACE(Insert);
! 
! 	/*
! 	 * The recptr I return is the beginning of the *next* record. This will be
! 	 * stored as LSN for changed data pages...
! 	 */
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
! 
! 	/* Need to update shared LogwrtRqst if some block was filled up */
! 	if (freespace < SizeOfXLogRecord)
! 		updrqst = true;			/* curridx is filled and available for writing
! 								 * out */
! 	else
! 		curridx = PrevBufIdx(curridx);
! 	WriteRqst = XLogCtl->xlblocks[curridx];
  
  	LWLockRelease(WALInsertLock);
  
--- 949,999 ----
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
!     /*
!      * We've now written out all of the xlog record and any associated blocks.
!      *
!      * If the xlog record was a request to perform special processing options,
!      * such as an xlog switch, do this here. Otherwise, just clean up before
!      * we release locks.
!      */
!     if (rmid == RM_XLOG_ID && info == XLOG_SWITCH)
!     {
!         /*
!          * Did AdvanceXLInsertBuffer() already step into a new file?
!          * If so, we don't need to switch files
!          */
!         if (!haveSwitchedXLogFile)
!         {
!             /* 
!              * Switch to next XLog segment file. We do this by writing out
!              * the current wal buffer page, then moving the pointers forward so
!              * that the next insertion point is in a new file. We *must*
!              * do this with WALInsertLock held, but we want to avoid doing
!              * this with WALWriteLock held, if possible.
!              */
!             (void) AdvanceXLInsertBuffer(true);
!         }
!        	updrqst = true;
!     }
! 
!     /* Ensure next record will be properly aligned */
!     Insert->currpos = (char *) Insert->currpage +
!     	MAXALIGN(Insert->currpos - (char *) Insert->currpage);
!     freespace = INSERT_FREESPACE(Insert);
! 
!     /*
!      * The recptr I return is the beginning of the *next* record. This will be
!      * stored as LSN for changed data pages...
!      */
!     INSERT_RECPTR(RecPtr, Insert, curridx);
! 
!     /* Need to update shared LogwrtRqst if some block was filled up */
!     if (freespace < SizeOfXLogRecord)
!     	updrqst = true;			/* curridx is filled and available for writing
!     							 * out */
!     else
!     	curridx = PrevBufIdx(curridx);
!     WriteRqst = XLogCtl->xlblocks[curridx];
  
  	LWLockRelease(WALInsertLock);
  
***************
*** 1181,1187 ****
   * Must be called with WALInsertLock held.
   */
  static bool
! AdvanceXLInsertBuffer(void)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogCtlWrite *Write = &XLogCtl->Write;
--- 1210,1216 ----
   * Must be called with WALInsertLock held.
   */
  static bool
! AdvanceXLInsertBuffer(bool SwitchXLog)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogCtlWrite *Write = &XLogCtl->Write;
***************
*** 1192,1197 ****
--- 1221,1228 ----
  	XLogRecPtr	NewPageEndPtr;
  	XLogPageHeader NewPage;
  
+     haveSwitchedXLogFile = false;
+ 
  	/* Use Insert->LogwrtResult copy if it's more fresh */
  	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
  		LogwrtResult = Insert->LogwrtResult;
***************
*** 1246,1253 ****
  				 * not good, so only write as much as we absolutely must.
  				 */
  				WriteRqst.Write = OldPageRqstPtr;
! 				WriteRqst.Flush.xlogid = 0;
! 				WriteRqst.Flush.xrecoff = 0;
  				XLogWrite(WriteRqst, false);
  				LWLockRelease(WALWriteLock);
  				Insert->LogwrtResult = LogwrtResult;
--- 1277,1289 ----
  				 * not good, so only write as much as we absolutely must.
  				 */
  				WriteRqst.Write = OldPageRqstPtr;
!                 if (SwitchXLog)
!     				WriteRqst.Flush = OldPageRqstPtr;
!                 else
!                 {
!     				WriteRqst.Flush.xlogid = 0;
!     				WriteRqst.Flush.xrecoff = 0;
!                 }
  				XLogWrite(WriteRqst, false);
  				LWLockRelease(WALWriteLock);
  				Insert->LogwrtResult = LogwrtResult;
***************
*** 1260,1265 ****
--- 1296,1310 ----
  	 * output page.
  	 */
  	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+ 
+     /*
+      * If required, reset the current offset to point to new segment file
+      */
+     if (SwitchXLog)
+         NewPageEndPtr.xrecoff = NewPageEndPtr.xrecoff 
+                                  - (NewPage->xlp_pageaddr.xrecoff % XLogSegSize)
+                                  + XLogSegSize;
+ 
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
  		/* crossing a logid boundary */
***************
*** 1305,1310 ****
--- 1350,1358 ----
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
  
  		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+ 
+         if (!SwitchXLog)
+             haveSwitchedXLogFile = true;
  	}
  
  	return update_needed;
***************
*** 5262,5268 ****
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		(void) AdvanceXLInsertBuffer();
  		/* OK to ignore update return flag, since we will do flush anyway */
  		freespace = INSERT_FREESPACE(Insert);
  	}
--- 5310,5316 ----
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		(void) AdvanceXLInsertBuffer(false);
  		/* OK to ignore update return flag, since we will do flush anyway */
  		freespace = INSERT_FREESPACE(Insert);
  	}
***************
*** 5448,5453 ****
--- 5496,5535 ----
  }
  
  /*
+  * Writes a record to xlog to show that an XLOG_SWITCH switch has taken place.
+  * Immediately afterwards, while still holding WALInsertLock, XLogInsert will
+  * perform a physical xlog switch, as if the xlog file had been filled.
+  * We take care not to acquire WALWriteLock if possible, so that the
+  * actual flush to disk can be performed immediately afterwards, yet allowing
+  * further XLogInserts to occur while we fsync the old xlog file.
+  */
+ static XLogRecPtr
+ RequestXLogSwitch(void)
+ {
+     XLogRecData rdata;
+     XLogRecPtr  recptr;
+  
+     xl_xlog_switch xlrec;
+  
+     /* 
+      * We put the time in the xlog_switch_record so that it is not a 
+      * zero-length field, which has special meaning. Should be useful too...
+      */
+     xlrec.xtime = time(NULL);
+  
+     rdata.buffer = InvalidBuffer;
+     rdata.data = (char *) (&xlrec);
+     rdata.len = sizeof(xl_xlog_switch);
+     rdata.next = NULL;
+ 
+     recptr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
+ 
+     XLogFlush(recptr);
+ 
+     return recptr;
+ }
+ 
+ /*
   * XLOG resource manager's routines
   */
  void
***************
*** 5466,5471 ****
--- 5548,5562 ----
  			ShmemVariableCache->oidCount = 0;
  		}
  	}
+     else if (info == XLOG_SWITCH)
+     {
+         /*
+          * Alter module-level variables controlling position of WAL-replay
+          */
+ 		close(readFile);
+ 		readFile = -1;
+ 		NextLogSeg(readId, readSeg);
+     }
  	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
  	{
  		CheckPoint	checkPoint;
***************
*** 5544,5549 ****
--- 5635,5649 ----
  		memcpy(&nextOid, rec, sizeof(Oid));
  		appendStringInfo(buf, "nextOid: %u", nextOid);
  	}
+     else if (info == XLOG_SWITCH)
+     {
+         xl_xlog_switch *xlrec = (xl_xlog_switch *) rec;
+         struct tm  *tm = localtime(&xlrec->xtime);
+ 
+     	appendStringInfo(buf, "xlog switch: %04u-%02u-%02u %02u:%02u:%02u",
+     			tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
+     			tm->tm_hour, tm->tm_min, tm->tm_sec);
+     }
  	else
  		appendStringInfo(buf, "UNKNOWN");
  }
***************
*** 5854,5859 ****
--- 5954,5987 ----
  }
  
  /*
+  * pg_xlog_switch: switch to next xlog file
+  *
+  */
+ Datum
+ pg_switch_xlog(PG_FUNCTION_ARGS)
+ {
+     XLogRecPtr switchpoint;
+ 	char		xlogfilename[MAXFNAMELEN];
+ 	text	   *result;
+ 
+ 	if (!superuser())
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ 				 (errmsg("must be superuser to switch xlog files"))));
+ 
+     switchpoint = RequestXLogSwitch();
+ 
+ 	/*
+ 	 * We're done.  As a convenience, return the WAL offset at switch
+ 	 */
+ 	snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
+ 			 switchpoint.xlogid, switchpoint.xrecoff);
+ 	result = DatumGetTextP(DirectFunctionCall1(textin,
+ 											 CStringGetDatum(xlogfilename)));
+ 	PG_RETURN_TEXT_P(result);
+ }
+ 
+ /*
   * pg_stop_backup: finish taking an on-line backup dump
   *
   * We remove the backup label file created by pg_start_backup, and instead
***************
*** 5885,5894 ****
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
  				 (errmsg("must be superuser to run a backup"))));
  
  	/*
  	 * Get the current end-of-WAL position; it will be unsafe to use this dump
! 	 * to restore to a point in advance of this time.  We can also clear
! 	 * forcePageWrites here.
  	 */
  	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
--- 6013,6030 ----
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
  				 (errmsg("must be superuser to run a backup"))));
  
+     /*
+      * Force a switch to a new xlog segment file, so that the backup
+      * is valid as soon as archiver moves it out. We do this first so that the
+      * archiver has a chance to move the file away as soon as possible.
+      */
+     stoppoint = RequestXLogSwitch();
+ 
  	/*
  	 * Get the current end-of-WAL position; it will be unsafe to use this dump
! 	 * to restore to a point prior to this time.  We can also clear
! 	 * forcePageWrites here. XXX could remove these lines and set 
!      * forcePageWrites at time of log switch
  	 */
  	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
***************
*** 5983,5988 ****
--- 6119,6129 ----
  	 */
  	CleanupBackupHistory();
  
+     /*
+      * XXX Maybe should wait here for archiver to archive the last xlog file
+      * and the history file?
+      */
+ 
  	/*
  	 * We're done.  As a convenience, return the ending WAL offset.
  	 */
Index: src/include/access/xlog_internal.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/access/xlog_internal.h,v
retrieving revision 1.13
diff -c -r1.13 xlog_internal.h
*** src/include/access/xlog_internal.h	5 Apr 2006 03:34:05 -0000	1.13
--- src/include/access/xlog_internal.h	27 Jul 2006 12:44:24 -0000
***************
*** 241,245 ****
--- 241,246 ----
   */
  extern Datum pg_start_backup(PG_FUNCTION_ARGS);
  extern Datum pg_stop_backup(PG_FUNCTION_ARGS);
+ extern Datum pg_switch_xlog(PG_FUNCTION_ARGS);
  
  #endif   /* XLOG_INTERNAL_H */
Index: src/include/catalog/pg_control.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_control.h,v
retrieving revision 1.29
diff -c -r1.29 pg_control.h
*** src/include/catalog/pg_control.h	4 Apr 2006 22:39:59 -0000	1.29
--- src/include/catalog/pg_control.h	27 Jul 2006 12:44:24 -0000
***************
*** 43,52 ****
--- 43,58 ----
  	time_t		time;			/* time stamp of checkpoint */
  } CheckPoint;
  
+ typedef struct xl_xlog_switch
+ {
+     time_t          xtime;
+ } xl_xlog_switch;
+ 
  /* XLOG info values for XLOG rmgr */
  #define XLOG_CHECKPOINT_SHUTDOWN		0x00
  #define XLOG_CHECKPOINT_ONLINE			0x10
  #define XLOG_NEXTOID					0x30
+ #define XLOG_SWITCH                     0x40
  
  
  /* System status indicator */
Index: src/include/catalog/pg_proc.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.417
diff -c -r1.417 pg_proc.h
*** src/include/catalog/pg_proc.h	25 Jul 2006 03:51:21 -0000	1.417
--- src/include/catalog/pg_proc.h	27 Jul 2006 12:44:29 -0000
***************
*** 3073,3078 ****
--- 3073,3080 ----
  DESCR("Prepare for taking an online backup");
  DATA(insert OID = 2173 ( pg_stop_backup			PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_stop_backup - _null_ ));
  DESCR("Finish taking an online backup");
+ DATA(insert OID = 2803 ( pg_switch_xlog			PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_switch_xlog - _null_ ));
+ DESCR("Switch to new xlog file segment");
  
  DATA(insert OID = 2621 ( pg_reload_conf			PGNSP PGUID 12 f f t f v 0 16 "" _null_ _null_ _null_ pg_reload_conf - _null_ ));
  DESCR("Reload configuration files");
---------------------------(end of broadcast)---------------------------
TIP 4: Have you searched our list archives?

               http://archives.postgresql.org

Reply via email to