Patch included to implement xlog switching, using an xlog record
"processing instruction" and forcibly moving xlog pointers.

1. Happens automatically on pg_stop_backup()

2. Can happen manually via pg_switch_xlog()

3. Implement range of utility functions:
        pg_current_wal_offset()
        pg_current_xlogfile()
        pg_current_xlogfile_offset() - for Hannu
        pg_xlogfile_from_wal_offset() for interpreting output from
pg_switch_xlog, pg_start/stop_backup()

Passes make check, applies cleanly to HEAD, includes doc patches with
clean SGML builds.

Design as clean as possible given and has implementation of
archive_timeout in mind also. Happy to work further on any code cleanups
requested.

I've done a variety of testing on it, doing concurrent pg_regress and
pg_switch_xlog(). All known issues resolved. Main test cases and sample
outputs are in switchtest.sh

Wide variety of cases need testing, so I'm expecting some further issues
to be reported.

I'm now working on completing the restartable recovery patch, which will
include further tests of PITR recoveries on the xswitch.patch.

-- 
  Simon Riggs
  EnterpriseDB          http://www.enterprisedb.com

Attachment: switchtest.sh
Description: application/shellscript

Index: doc/src/sgml/func.sgml
===================================================================
RCS file: /projects/cvsroot/pgsql/doc/src/sgml/func.sgml,v
retrieving revision 1.328
diff -c -r1.328 func.sgml
*** doc/src/sgml/func.sgml	28 Jul 2006 18:33:03 -0000	1.328
--- doc/src/sgml/func.sgml	31 Jul 2006 22:16:29 -0000
***************
*** 10143,10148 ****
--- 10143,10157 ----
      <primary>pg_stop_backup</primary>
     </indexterm>
     <indexterm zone="functions-admin">
+     <primary>pg_xlogfile_from_offset</primary>
+    </indexterm>
+    <indexterm zone="functions-admin">
+     <primary>pg_current_xlogfile_offset</primary>
+    </indexterm>
+    <indexterm zone="functions-admin">
+     <primary>pg_current_xlogfile</primary>
+    </indexterm>
+    <indexterm zone="functions-admin">
      <primary>backup</primary>
     </indexterm>
  
***************
*** 10175,10180 ****
--- 10184,10217 ----
         <entry><type>text</type></entry>
         <entry>Finish performing on-line backup</entry>
        </row>
+       <row>
+        <entry>
+         <literal><function>pg_xlogfile_from_wal_offset</function>(<parameter>label</> <type>text</>)</literal>
+         </entry>
+        <entry><type>text</type></entry>
+        <entry>Get filename from offset, as provided by pg_start/stop_backup()</entry>
+       </row>
+       <row>
+        <entry>
+         <literal><function>pg_current_wal_offset</function>()</literal>
+         </entry>
+        <entry><type>text</type></entry>
+        <entry>Get current WAL offset</entry>
+       </row>
+       <row>
+        <entry>
+         <literal><function>pg_current_xlogfile_offset</function>()</literal>
+         </entry>
+        <entry><type>text</type></entry>
+        <entry>Get current xlog filename, plus byte offset within file (based on current WAL offset)</entry>
+       </row>
+       <row>
+        <entry>
+         <literal><function>pg_current_xlogfile</function>()</literal>
+         </entry>
+        <entry><type>text</type></entry>
+        <entry>Get current xlog filename</entry>
+       </row>
       </tbody>
      </tgroup>
     </table>
***************
*** 10184,10192 ****
      arbitrary user-defined label for the backup.  (Typically this would be
      the name under which the backup dump file will be stored.)  The function
      writes a backup label file into the database cluster's data directory,
!     and then returns the backup's starting WAL offset as text.  (The user
      need not pay any attention to this result value, but it is provided in
!     case it is of use.)
     </para>
  
     <para>
--- 10221,10246 ----
      arbitrary user-defined label for the backup.  (Typically this would be
      the name under which the backup dump file will be stored.)  The function
      writes a backup label file into the database cluster's data directory,
!     and then returns the backup's starting WAL offset as text.  The user
      need not pay any attention to this result value, but it is provided in
!     case it is of use. 
! <programlisting>
!     postgres=# select pg_start_backup('label_goes_here');
!      pg_start_backup
!     -----------------
!      0/D4445B8
!     (1 row)
! </programlisting>
! 
!     The offset can be translated into the corresponding xlog filename like this
! 
! <programlisting>
!     postgres=# select pg_xlogfile_from_wal_offset(pg_stop_backup());
!      pg_xlogfile_from_offset
!     --------------------------
!      00000001000000000000000D
!     (1 row)
! </programlisting>
     </para>
  
     <para>
***************
*** 10196,10202 ****
      <function>pg_start_backup</>, the starting and ending WAL offsets for
      the backup, and the starting and ending times of the backup.  The return
      value is the backup's ending WAL offset (which again may be of little
!     interest).
     </para>
  
     <para>
--- 10250,10299 ----
      <function>pg_start_backup</>, the starting and ending WAL offsets for
      the backup, and the starting and ending times of the backup.  The return
      value is the backup's ending WAL offset (which again may be of little
!     interest, though can be translated using pg_xlogfile_from_offset()).
!     The xlog file is also closed and switched automatically to the next
!     segment file.
!    </para>
! 
!    <para>
!     <function>pg_switch_xlog</> moves to the next xlog file, allowing the 
!     previous file to be archived (assuming you are using continuous archiving).
!     The new WAL offset is returned.
!    </para>
! 
!    <para>
!     Current information functions are also available: 
!     <function>pg_current_wal_offset</> takes the current WAL Insert
!     pointer and displays it directly in WAL offset form, similar to the output
!     from pg_start_backup() and pg_stop_backup().
!     <function>pg_current_xlogfile_offset</> takes the current WAL Insert
!     pointer and translates that into a filename and decimal byte offset within 
!     that file, separated by a single space.
!     <function>pg_current_xlogfile</> provides only the name of the current
!     xlogfile, in constrast to <function>pg_current_xlogfile_offset</>.
!    </para>
! 
!    <para>
!     An example:
! <programlisting>
! postgres=# select pg_current_xlogfile();
!    pg_current_xlogfile
! --------------------------
!  000000010000000000000002
! (1 row)
! 
! postgres=# select pg_switch_xlog();
!  pg_switch_xlog
! ----------------
!  0/2000020
! (1 row)
! 
! postgres=# select pg_current_xlogfile();
!    pg_current_xlogfile
! --------------------------
!  000000010000000000000003
! (1 row)
! </programlisting>
     </para>
  
     <para>
Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.245
diff -c -r1.245 xlog.c
*** src/backend/access/transam/xlog.c	30 Jul 2006 02:07:18 -0000	1.245
--- src/backend/access/transam/xlog.c	31 Jul 2006 22:16:37 -0000
***************
*** 340,345 ****
--- 340,346 ----
  	XLogPageHeader currpage;	/* points to header of block in cache */
  	char	   *currpos;		/* current insertion point in cache */
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
+     XLogRecPtr  LastXLogSwitchPtr;  /* redo pointer when last xlog switched */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  } XLogCtlInsert;
  
***************
*** 410,415 ****
--- 411,417 ----
  #define NextBufIdx(idx)		\
  		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
+ 
  /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
   * See discussion above.
***************
*** 452,458 ****
  static TimeLineID lastPageTLI = 0;
  
  static bool InRedo = false;
! 
  
  static void XLogArchiveNotify(const char *xlog);
  static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
--- 454,461 ----
  static TimeLineID lastPageTLI = 0;
  
  static bool InRedo = false;
! /* haveSwitchedXLogFile is for both normal usage and recovery mode */
! static bool haveSwitchedXLogFile = false;
  
  static void XLogArchiveNotify(const char *xlog);
  static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
***************
*** 465,471 ****
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(void);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
  static int XLogFileInit(uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock);
--- 468,475 ----
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static XLogRecPtr StartOfNextSegment(XLogRecPtr);
! static bool AdvanceXLInsertBuffer(bool SwitchXLog);
  static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
  static int XLogFileInit(uint32 log, uint32 seg,
  			 bool *use_existent, bool use_lock);
***************
*** 854,860 ****
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		updrqst = AdvanceXLInsertBuffer();
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
--- 858,864 ----
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		updrqst = AdvanceXLInsertBuffer(false);
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
***************
*** 937,943 ****
  		}
  
  		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer();
  		curridx = Insert->curridx;
  		/* Insert cont-record header */
  		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
--- 941,947 ----
  		}
  
  		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer(false);
  		curridx = Insert->curridx;
  		/* Insert cont-record header */
  		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
***************
*** 947,972 ****
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
! 	/* Ensure next record will be properly aligned */
! 	Insert->currpos = (char *) Insert->currpage +
! 		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
! 	freespace = INSERT_FREESPACE(Insert);
! 
! 	/*
! 	 * The recptr I return is the beginning of the *next* record. This will be
! 	 * stored as LSN for changed data pages...
! 	 */
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
! 
! 	/* Need to update shared LogwrtRqst if some block was filled up */
! 	if (freespace < SizeOfXLogRecord)
! 		updrqst = true;			/* curridx is filled and available for writing
! 								 * out */
! 	else
! 		curridx = PrevBufIdx(curridx);
! 	WriteRqst = XLogCtl->xlblocks[curridx];
  
! 	LWLockRelease(WALInsertLock);
  
  	if (updrqst)
  	{
--- 951,1015 ----
  		freespace = INSERT_FREESPACE(Insert);
  	}
  
!     /*
!      * We've now written out all of the xlog record and any associated blocks.
!      *
!      * If the xlog record was a request to perform special processing options,
!      * such as an xlog switch, do this here. 
!      */
!     if (rmid == RM_XLOG_ID && info == XLOG_SWITCH)
!     {
!         /*
!          * Did AdvanceXLInsertBuffer() already step into a new file?
!          * If so, we don't need to switch files and can skip this step.
!          */
!         if (!haveSwitchedXLogFile)
!         {
!             /* 
!              * Switch to next XLog segment file. We do this by writing out
!              * the current wal buffer page, then moving the pointers forward so
!              * that the next insertion point is in a new file. We *must*
!              * do this with WALInsertLock held.
!              */
!             (void) AdvanceXLInsertBuffer(true);
! 
!     		curridx = Insert->curridx;
! 
!             /*
!              * The recptr I return is the beginning of the *next* record. This will be
!              * stored as LSN for changed data pages...
!              */
!             INSERT_RECPTR(RecPtr, Insert, curridx);
! 
!             /*
!              * Keep this updated so we can avoid multiple log switches
!              */
!             Insert->LastXLogSwitchPtr = RecPtr;
!         }
!     }
!     else
!     {
!         /* Ensure next record will be properly aligned */
!         Insert->currpos = (char *) Insert->currpage +
!         	MAXALIGN(Insert->currpos - (char *) Insert->currpage);
!         freespace = INSERT_FREESPACE(Insert);
! 
!         /*
!          * The recptr I return is the beginning of the *next* record. This will be
!          * stored as LSN for changed data pages...
!          */
!         INSERT_RECPTR(RecPtr, Insert, curridx);
!  
!        /* Need to update shared LogwrtRqst if some block was filled up */
!         if (freespace < SizeOfXLogRecord)
!         	updrqst = true;			/* curridx is filled and available for writing
!         							 * out */
!         else
!         	curridx = PrevBufIdx(curridx);
!     }
!     WriteRqst = XLogCtl->xlblocks[curridx];
  
!  	LWLockRelease(WALInsertLock);
  
  	if (updrqst)
  	{
***************
*** 1170,1175 ****
--- 1213,1242 ----
  }
  
  /*
+  * Move pointer to start of next xlog segment
+  */
+ static XLogRecPtr 
+ StartOfNextSegment(XLogRecPtr RecPtr)
+ {
+     XLogRecPtr NewRecPtr;
+ 
+     NewRecPtr.xrecoff = RecPtr.xrecoff 
+                    - RecPtr.xrecoff % XLogSegSize    
+                    + XLogSegSize;
+ 
+     NewRecPtr.xlogid = RecPtr.xlogid;
+ 
+ 	if (NewRecPtr.xrecoff >= XLogFileSize)
+ 	{
+ 		/* crossing a logid boundary */
+         NewRecPtr.xlogid += 1;
+ 		NewRecPtr.xrecoff = 0;
+ 	}
+ 
+     return NewRecPtr;
+ }
+ 
+ /*
   * Advance the Insert state to the next buffer page, writing out the next
   * buffer if it still contains unwritten data.
   *
***************
*** 1181,1187 ****
   * Must be called with WALInsertLock held.
   */
  static bool
! AdvanceXLInsertBuffer(void)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogCtlWrite *Write = &XLogCtl->Write;
--- 1248,1254 ----
   * Must be called with WALInsertLock held.
   */
  static bool
! AdvanceXLInsertBuffer(bool SwitchXLog)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogCtlWrite *Write = &XLogCtl->Write;
***************
*** 1192,1201 ****
--- 1259,1309 ----
  	XLogRecPtr	NewPageEndPtr;
  	XLogPageHeader NewPage;
  
+     haveSwitchedXLogFile = false;
+ 
  	/* Use Insert->LogwrtResult copy if it's more fresh */
  	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
  		LogwrtResult = Insert->LogwrtResult;
  
+     if (SwitchXLog)
+     {
+ 		XLogRecPtr	FinishedPageRqstPtr;
+ 
+ 		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+ 
+ 		/* Before waiting, get info_lck and update LogwrtResult */
+ 		{
+ 			/* use volatile pointer to prevent code rearrangement */
+ 			volatile XLogCtlData *xlogctl = XLogCtl;
+ 
+ 			SpinLockAcquire(&xlogctl->info_lck);
+ 			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
+ 				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
+ 			LogwrtResult = xlogctl->LogwrtResult;
+ 			SpinLockRelease(&xlogctl->info_lck);
+ 		}
+ 
+ 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ 		LogwrtResult = Write->LogwrtResult;
+ 		/*
+ 		 * Have to write buffers while holding insert lock. Double Drat.
+ 		 */
+ 		WriteRqst.Write = FinishedPageRqstPtr;
+ 		WriteRqst.Flush = FinishedPageRqstPtr;
+ 		XLogWrite(WriteRqst, false);
+ 
+         /*
+          * Now we've written the XLOG_SWITCH record to disk, yet we aren't
+          * at the end of the file, so we still need to fsync the old file
+          */
+ 		issue_xlog_fsync();
+ 		LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
+ 		if (XLogArchivingActive())
+ 			XLogArchiveNotifySeg(openLogId, openLogSeg);
+ 		LWLockRelease(WALWriteLock);
+ 		Insert->LogwrtResult = LogwrtResult;
+     }
+ 
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
  	 * zero if the buffer hasn't been used yet).  Fall through if it's already
***************
*** 1260,1265 ****
--- 1368,1380 ----
  	 * output page.
  	 */
  	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+ 
+     /*
+      * If required, reset the current offset to point to new segment file
+      */
+     if (SwitchXLog)
+         NewPageEndPtr = StartOfNextSegment(NewPageEndPtr);
+ 
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
  		/* crossing a logid boundary */
***************
*** 1269,1274 ****
--- 1384,1390 ----
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
  	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
+ 
  	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
  
  	Insert->curridx = nextidx;
***************
*** 1305,1310 ****
--- 1421,1429 ----
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
  
  		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+ 
+         if (!SwitchXLog)
+             haveSwitchedXLogFile = true;
  	}
  
  	return update_needed;
***************
*** 2673,2678 ****
--- 2792,2802 ----
   *
   * The record is copied into readRecordBuf, so that on successful return,
   * the returned record pointer always points there.
+  *
+  * Special processing occurs *following* Processing Instructions such as an
+  * xlog switch. Bear in mind that when we are handling the processing
+  * instructions we don't know yet that's what they are, so we treat them as
+  * normal records while they themselves pass through.
   */
  static XLogRecord *
  ReadRecord(XLogRecPtr *RecPtr, int emode)
***************
*** 2704,2722 ****
  	{
  		RecPtr = &tmpRecPtr;
  		/* fast case if next record is on same page */
! 		if (nextRecord != NULL)
  		{
  			record = nextRecord;
  			goto got_record;
  		}
! 		/* align old recptr to next page */
! 		if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
! 			tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
! 		if (tmpRecPtr.xrecoff >= XLogFileSize)
! 		{
! 			(tmpRecPtr.xlogid)++;
! 			tmpRecPtr.xrecoff = 0;
! 		}
  		/* We will account for page header size below */
  	}
  	else
--- 2828,2852 ----
  	{
  		RecPtr = &tmpRecPtr;
  		/* fast case if next record is on same page */
! 		if (nextRecord != NULL && !haveSwitchedXLogFile)
  		{
  			record = nextRecord;
  			goto got_record;
  		}
!         if (haveSwitchedXLogFile)
!             tmpRecPtr = StartOfNextSegment(tmpRecPtr);
!         else
!         {
!     		/* align old recptr to next page */
!     		if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
!     			tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
! 
!     		if (tmpRecPtr.xrecoff >= XLogFileSize)
!     		{
!     			(tmpRecPtr.xlogid)++;
!     			tmpRecPtr.xrecoff = 0;
!     		}
!         }
  		/* We will account for page header size below */
  	}
  	else
***************
*** 2737,2742 ****
--- 2867,2878 ----
  		randAccess = true;		/* allow curFileTLI to go backwards too */
  	}
  
+     if (haveSwitchedXLogFile)
+         ereport(LOG, 
+                  (errmsg("executing processing instruction: "
+                          "xlog switch; redo pointer moved to %X/%X",
+                             tmpRecPtr.xlogid, tmpRecPtr.xrecoff)));
+ 
  	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
  	{
  		close(readFile);
***************
*** 2854,2890 ****
  						record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
  		goto next_record_is_invalid;
  	}
! 	if (randAccess)
! 	{
! 		/*
! 		 * We can't exactly verify the prev-link, but surely it should be less
! 		 * than the record's own address.
! 		 */
! 		if (!XLByteLT(record->xl_prev, *RecPtr))
! 		{
! 			ereport(emode,
! 					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
! 							record->xl_prev.xlogid, record->xl_prev.xrecoff,
! 							RecPtr->xlogid, RecPtr->xrecoff)));
! 			goto next_record_is_invalid;
! 		}
! 	}
! 	else
! 	{
! 		/*
! 		 * Record's prev-link should exactly match our previous location. This
! 		 * check guards against torn WAL pages where a stale but valid-looking
! 		 * WAL record starts on a sector boundary.
! 		 */
! 		if (!XLByteEQ(record->xl_prev, ReadRecPtr))
! 		{
! 			ereport(emode,
! 					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
! 							record->xl_prev.xlogid, record->xl_prev.xrecoff,
! 							RecPtr->xlogid, RecPtr->xrecoff)));
! 			goto next_record_is_invalid;
! 		}
! 	}
  
  	/*
  	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
--- 2990,3030 ----
  						record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
  		goto next_record_is_invalid;
  	}
! 
!     if (!haveSwitchedXLogFile)
!     {
!         if (randAccess)
!     	{
!     		/*
!     		 * We can't exactly verify the prev-link, but surely it should be less
!     		 * than the record's own address. 
!     		 */
!     		if (!XLByteLT(record->xl_prev, *RecPtr))
!     		{
!     			ereport(emode,
!     					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
!     							record->xl_prev.xlogid, record->xl_prev.xrecoff,
!     							RecPtr->xlogid, RecPtr->xrecoff)));
!     			goto next_record_is_invalid;
!     		}
!     	}
!     	else
!     	{
!     		/*
!     		 * Record's prev-link should exactly match our previous location. This
!     		 * check guards against torn WAL pages where a stale but valid-looking
!     		 * WAL record starts on a sector boundary.
!     		 */
!     		if (!XLByteEQ(record->xl_prev, ReadRecPtr))
!     		{
!     			ereport(emode,
!     					(errmsg("record with incorrect prev-link %X/%X at %X/%X",
!     							record->xl_prev.xlogid, record->xl_prev.xrecoff,
!     							RecPtr->xlogid, RecPtr->xrecoff)));
!     			goto next_record_is_invalid;
!     		}
!     	}
!     }
  
  	/*
  	 * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
***************
*** 2918,2923 ****
--- 3058,3068 ----
  	buffer = readRecordBuf;
  	nextRecord = NULL;
  	len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
+ 
+     /*
+      * If the record crosses a page boundary, then we need to reassemble
+      * the record. Otherwise we drop through quickly.
+      */
  	if (total_len > len)
  	{
  		/* Need to reassemble record */
***************
*** 2982,2987 ****
--- 3127,3134 ----
  		}
  		if (!RecordIsValid(record, *RecPtr, emode))
  			goto next_record_is_invalid;
+         else
+             haveSwitchedXLogFile = false;
  		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
  		if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
  			MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
***************
*** 2994,3005 ****
--- 3141,3155 ----
  			pageHeaderSize +
  			MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
  		ReadRecPtr = *RecPtr;
+ 
  		return record;
  	}
  
  	/* Record does not cross a page boundary */
  	if (!RecordIsValid(record, *RecPtr, emode))
  		goto next_record_is_invalid;
+     else
+         haveSwitchedXLogFile = false;
  	if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
  		MAXALIGN(total_len))
  		nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
***************
*** 4589,4594 ****
--- 4739,4746 ----
  	ThisTimeLineID = checkPoint.ThisTimeLineID;
  
  	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+     XLogCtl->Insert.LastXLogSwitchPtr.xlogid = 0;
+     XLogCtl->Insert.LastXLogSwitchPtr.xrecoff = 0;
  
  	if (XLByteLT(RecPtr, checkPoint.redo))
  		ereport(PANIC,
***************
*** 4725,4731 ****
  				/* Pop the error context stack */
  				error_context_stack = errcontext.previous;
  
! 				LastRec = ReadRecPtr;
  
  				record = ReadRecord(NULL, LOG);
  			} while (record != NULL && recoveryContinue);
--- 4877,4889 ----
  				/* Pop the error context stack */
  				error_context_stack = errcontext.previous;
  
!                 /* 
!                  * In case of failure immediately following a log switch
!                  * we ignore the log switch record since it is merely
!                  * a processing instruction and not useful data
!                  */
!                 if (!haveSwitchedXLogFile)
!     				LastRec = ReadRecPtr;
  
  				record = ReadRecord(NULL, LOG);
  			} while (record != NULL && recoveryContinue);
***************
*** 4734,4739 ****
--- 4892,4904 ----
  			 * end of main redo apply loop
  			 */
  
+             if (haveSwitchedXLogFile)
+             {
+     			ereport(LOG,
+ 					(errmsg("skipping trailing processing instructions")));
+                 haveSwitchedXLogFile = false;
+             }
+ 
  			ereport(LOG,
  					(errmsg("redo done at %X/%X",
  							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
***************
*** 5262,5268 ****
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		(void) AdvanceXLInsertBuffer();
  		/* OK to ignore update return flag, since we will do flush anyway */
  		freespace = INSERT_FREESPACE(Insert);
  	}
--- 5427,5433 ----
  	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
  	{
! 		(void) AdvanceXLInsertBuffer(false);
  		/* OK to ignore update return flag, since we will do flush anyway */
  		freespace = INSERT_FREESPACE(Insert);
  	}
***************
*** 5448,5453 ****
--- 5613,5682 ----
  }
  
  /*
+  * Writes a record to xlog to show that an XLOG_SWITCH has taken place.
+  * 
+  * While continuing to hold locks we write up to that point and then fsync
+  * the previous segment, allowing it to be closed. The pointers now are in
+  * the following segment, but we have not yet fsynced the new segment.
+  *
+  * Grabbing WALWriteLock while we have WALInsertLock isn't good, but the 
+  * switch is only designed to be used when either a full backup is taken
+  * which is relatively rarely, or when an automatic log switch occurs
+  * because the system isn't busy enough - neither of those are enough to
+  * raise a performance concern. 
+  * 
+  * There's no point in fsyncing the new segment since if we did there would
+  * still be a possible failure case between the two fsyncs where the old
+  * file is on disk, but the newly opened file is not.
+  * 
+  * We handle that failure case during recovery, by saying that if we find an
+  * XLOG_SWITCH record with nothing else after it, then we will end recovery
+  * by going back a step to the previous xlrec. The XLOG_SWITCH will then be
+  * overwritten by the zero-ing out of the page at end of recovery.
+  */
+ XLogRecPtr
+ RequestXLogSwitch(bool force)
+ {
+ 	XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	XLogRecPtr	RecPtr;
+ 	XLogRecPtr	LastRecPtr;
+     XLogRecData rdata;
+     xl_xlog_switch xlrec_switch;
+ 
+     /*
+      * If we have not inserted any XLOG records since the last
+      * xlog switch then we can avoid further switching. 
+      * This avoids multiple adjacent log switches in a variety of cases,
+      * which would waste log space, since we may otherwise have alternating
+      * possible loss of both current and previous checkpoint
+      * records if the machine crashes just as we're writing the update.
+      */
+ 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+     INSERT_RECPTR(RecPtr, Insert, Insert->curridx);
+     LastRecPtr = Insert->LastXLogSwitchPtr;
+     LWLockRelease(WALInsertLock);
+ 
+     Assert(RecPtr.xlogid > LastRecPtr.xlogid);
+     if (RecPtr.xrecoff > LastRecPtr.xrecoff || force)
+     {
+         /* 
+          * We put the time in the xlog_switch_record so that it is not a 
+          * zero-length field, which has special meaning. Should be useful too...
+          */
+         xlrec_switch.xtime = time(NULL);
+      
+         rdata.buffer = InvalidBuffer;
+         rdata.data = (char *) (&xlrec_switch);
+         rdata.len = sizeof(xl_xlog_switch);
+         rdata.next = NULL;
+ 
+         RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
+     }
+ 
+     return RecPtr;
+ }
+ 
+ /*
   * XLOG resource manager's routines
   */
  void
***************
*** 5466,5471 ****
--- 5695,5715 ----
  			ShmemVariableCache->oidCount = 0;
  		}
  	}
+     else if (info == XLOG_SWITCH)
+     {
+         /*
+          * Invoke special state for processing instruction. We stay in this
+          * state until we have successfully read a following redo record,
+          * so all the special processing is handled in ReadRecord()
+          */
+         haveSwitchedXLogFile = true;
+ 
+         ereport(LOG, 
+                  (errmsg("reading processing instruction: "
+                          "xlog switch; redo pointer at %X/%X",
+                             ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+ 
+     }
  	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
  	{
  		CheckPoint	checkPoint;
***************
*** 5544,5549 ****
--- 5788,5802 ----
  		memcpy(&nextOid, rec, sizeof(Oid));
  		appendStringInfo(buf, "nextOid: %u", nextOid);
  	}
+     else if (info == XLOG_SWITCH)
+     {
+         xl_xlog_switch *xlrec = (xl_xlog_switch *) rec;
+         struct tm  *tm = localtime(&xlrec->xtime);
+ 
+     	appendStringInfo(buf, "xlog switch: %04u-%02u-%02u %02u:%02u:%02u",
+     			tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
+     			tm->tm_hour, tm->tm_min, tm->tm_sec);
+     }
  	else
  		appendStringInfo(buf, "UNKNOWN");
  }
***************
*** 5854,5859 ****
--- 6107,6279 ----
  }
  
  /*
+  * pg_xlog_switch: switch to next xlog file
+  *
+  */
+ Datum
+ pg_switch_xlog(PG_FUNCTION_ARGS)
+ {
+     XLogRecPtr switchpoint;
+ 	char		xlogfilename[MAXFNAMELEN];
+ 	text	   *result;
+ 
+ 	if (!superuser())
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ 				 (errmsg("must be superuser to switch xlog files"))));
+ 
+     switchpoint = RequestXLogSwitch(false);
+ 
+ 	/*
+ 	 * We're done.  As a convenience, return the WAL offset at switch
+ 	 */
+ 	snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
+ 			 switchpoint.xlogid, switchpoint.xrecoff);
+ 	result = DatumGetTextP(DirectFunctionCall1(textin,
+ 											 CStringGetDatum(xlogfilename)));
+ 	PG_RETURN_TEXT_P(result);
+ }
+ 
+ /*
+  *  Allows user to find out the current xlog filename, given an xlog
+  *  offset, such as is returned by pg_stop_backup().
+  */
+ Datum
+ pg_xlogfile_from_wal_offset(PG_FUNCTION_ARGS)
+ {
+ 	text	   *offset = PG_GETARG_TEXT_P(0);
+ 	char	   *offsetstr;
+ 	uint32		xlogid;			
+ 	uint32		xrecoff;		
+ 	uint32		xlogseg;
+ 	XLogRecPtr	offsetpoint;
+ 	char		xlogfilename[MAXFNAMELEN];
+ 
+ 	text	   *result;
+ 
+ 	offsetstr = DatumGetCString(DirectFunctionCall1(textout,
+ 												 PointerGetDatum(offset)));
+ 
+ 	if (sscanf(offsetstr, "%X/%X", &xlogid, &xrecoff) != 2)
+     	ereport(ERROR,
+ 				(errcode(ERRCODE_DATA_EXCEPTION),
+ 				 errmsg("could not parse xlog offset \"%s\"",
+ 						offsetstr)));
+ 
+     offsetpoint.xlogid = xlogid;
+     offsetpoint.xrecoff = xrecoff;
+ 
+ 	XLByteToSeg(offsetpoint, xlogid, xlogseg);
+ 	XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
+ 
+ 	result = DatumGetTextP(DirectFunctionCall1(textin,
+ 										 CStringGetDatum(xlogfilename)));
+ 
+ 	PG_RETURN_TEXT_P(result);
+ }
+ 
+ /*
+  *  Allows user to find out the current xlog filename, plus the byte offset 
+  *  within that file (not the same thing as an xlog record pointer)
+  */
+ Datum
+ pg_current_xlogfile_offset(PG_FUNCTION_ARGS)
+ {
+ 	XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	uint32		xlogid;
+ 	uint32		xrecoff;		
+ 	uint32		xlogseg;
+ 	XLogRecPtr	current_recptr;
+ 	char		xlogfilename[MAXFNAMELEN];
+     char        xlogfileoffset[MAXFNAMELEN];
+ 
+ 	text	   *result;
+ 
+ 	/*
+ 	 * Get the current end-of-WAL position
+ 	 */
+ 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ 	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
+ 	LWLockRelease(WALInsertLock);
+ 
+ 	XLByteToSeg(current_recptr, xlogid, xlogseg);
+ 	XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
+     xrecoff = current_recptr.xrecoff % XLogSegSize;
+ 
+ 	snprintf(xlogfileoffset, sizeof(xlogfileoffset), " %u", xrecoff);
+ 
+     strncat(xlogfilename, xlogfileoffset, MAXFNAMELEN);
+ 
+ 	result = DatumGetTextP(DirectFunctionCall1(textin,
+ 										 CStringGetDatum(xlogfilename)));
+ 
+ 	PG_RETURN_TEXT_P(result);
+ }
+ 
+ /*
+  *  Allows user to find out the current xlog filename
+  */
+ Datum
+ pg_current_xlogfile(PG_FUNCTION_ARGS)
+ {
+ 	XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	uint32		xlogid;
+ 	uint32		xlogseg;
+ 	XLogRecPtr	current_recptr;
+ 	char		xlogfilename[MAXFNAMELEN];
+ 
+ 	text	   *result;
+ 
+ 	/*
+ 	 * Get the current end-of-WAL position
+ 	 */
+ 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ 	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
+ 	LWLockRelease(WALInsertLock);
+ 
+ 	XLByteToSeg(current_recptr, xlogid, xlogseg);
+ 	XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
+ 
+ 	result = DatumGetTextP(DirectFunctionCall1(textin,
+ 										 CStringGetDatum(xlogfilename)));
+ 
+ 	PG_RETURN_TEXT_P(result);
+ }
+ 
+ /*
+  *  Allows user to find out the current xlog filename
+  */
+ Datum
+ pg_current_wal_offset(PG_FUNCTION_ARGS)
+ {
+ 	XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	uint32		xlogid;
+ 	uint32		xlogseg;
+ 	XLogRecPtr	current_recptr;
+ 	char		wal_offset[MAXFNAMELEN];
+ 
+ 	text	   *result;
+ 
+ 	/*
+ 	 * Get the current end-of-WAL position
+ 	 */
+ 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+ 	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
+ 	LWLockRelease(WALInsertLock);
+ 
+ 	XLByteToSeg(current_recptr, xlogid, xlogseg);
+ 	XLogFileName(wal_offset, ThisTimeLineID, xlogid, xlogseg);
+ 
+ 	snprintf(wal_offset, sizeof(wal_offset), "%X/%X",
+ 			 current_recptr.xlogid, current_recptr.xrecoff);
+ 
+ 	result = DatumGetTextP(DirectFunctionCall1(textin,
+ 										 CStringGetDatum(wal_offset)));
+ 
+ 	PG_RETURN_TEXT_P(result);
+ }
+ 
+ /*
   * pg_stop_backup: finish taking an on-line backup dump
   *
   * We remove the backup label file created by pg_start_backup, and instead
***************
*** 5885,5894 ****
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
  				 (errmsg("must be superuser to run a backup"))));
  
  	/*
  	 * Get the current end-of-WAL position; it will be unsafe to use this dump
! 	 * to restore to a point in advance of this time.  We can also clear
! 	 * forcePageWrites here.
  	 */
  	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
--- 6305,6322 ----
  				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
  				 (errmsg("must be superuser to run a backup"))));
  
+     /*
+      * Force a switch to a new xlog segment file, so that the backup
+      * is valid as soon as archiver moves it out. We do this first so that the
+      * archiver has a chance to move the file away as soon as possible.
+      */
+     stoppoint = RequestXLogSwitch(true);
+ 
  	/*
  	 * Get the current end-of-WAL position; it will be unsafe to use this dump
! 	 * to restore to a point prior to this time.  We can also clear
! 	 * forcePageWrites here. XXX could remove these lines and set 
!      * forcePageWrites at time of log switch
  	 */
  	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
***************
*** 5983,5988 ****
--- 6411,6421 ----
  	 */
  	CleanupBackupHistory();
  
+     /*
+      * XXX Maybe should wait here for archiver to archive the last xlog file
+      * and the history file?
+      */
+ 
  	/*
  	 * We're done.  As a convenience, return the ending WAL offset.
  	 */
Index: src/include/access/xlog_internal.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/access/xlog_internal.h,v
retrieving revision 1.13
diff -c -r1.13 xlog_internal.h
*** src/include/access/xlog_internal.h	5 Apr 2006 03:34:05 -0000	1.13
--- src/include/access/xlog_internal.h	31 Jul 2006 22:16:39 -0000
***************
*** 236,245 ****
--- 236,253 ----
  
  extern const RmgrData RmgrTable[];
  
+ extern XLogRecPtr RequestXLogSwitch(bool force);
+ 
+ 
  /*
   * These aren't in xlog.h because I'd rather not include fmgr.h there.
   */
  extern Datum pg_start_backup(PG_FUNCTION_ARGS);
  extern Datum pg_stop_backup(PG_FUNCTION_ARGS);
+ extern Datum pg_switch_xlog(PG_FUNCTION_ARGS);
+ extern Datum pg_xlogfile_from_wal_offset(PG_FUNCTION_ARGS);
+ extern Datum pg_current_xlogfile_offset(PG_FUNCTION_ARGS);
+ extern Datum pg_current_xlogfile(PG_FUNCTION_ARGS);
+ extern Datum pg_current_wal_offset(PG_FUNCTION_ARGS);
  
  #endif   /* XLOG_INTERNAL_H */
Index: src/include/catalog/pg_control.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_control.h,v
retrieving revision 1.29
diff -c -r1.29 pg_control.h
*** src/include/catalog/pg_control.h	4 Apr 2006 22:39:59 -0000	1.29
--- src/include/catalog/pg_control.h	31 Jul 2006 22:16:39 -0000
***************
*** 43,53 ****
  	time_t		time;			/* time stamp of checkpoint */
  } CheckPoint;
  
! /* XLOG info values for XLOG rmgr */
  #define XLOG_CHECKPOINT_SHUTDOWN		0x00
  #define XLOG_CHECKPOINT_ONLINE			0x10
  #define XLOG_NEXTOID					0x30
  
  
  /* System status indicator */
  typedef enum DBState
--- 43,72 ----
  	time_t		time;			/* time stamp of checkpoint */
  } CheckPoint;
  
! 
! /* 
!  * XLOG info values for XLOG rmgr
!  */
! /*
!  * Database state related record types
!  */
  #define XLOG_CHECKPOINT_SHUTDOWN		0x00
  #define XLOG_CHECKPOINT_ONLINE			0x10
  #define XLOG_NEXTOID					0x30
  
+ typedef struct xl_xlog_switch
+ {
+     time_t          xtime;
+ } xl_xlog_switch;
+ /* 
+  * Processing Instructions
+  * 
+  * These info values are special processing instructions for use
+  * during xlog replay. They don't effect the state of the database,
+  * only the way that xlog files should be read during recovery.
+  */
+ #define XLOG_SWITCH                     0x40
+ 
  
  /* System status indicator */
  typedef enum DBState
Index: src/include/catalog/pg_proc.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.419
diff -c -r1.419 pg_proc.h
*** src/include/catalog/pg_proc.h	28 Jul 2006 18:33:04 -0000	1.419
--- src/include/catalog/pg_proc.h	31 Jul 2006 22:16:48 -0000
***************
*** 3101,3106 ****
--- 3101,3116 ----
  DESCR("Prepare for taking an online backup");
  DATA(insert OID = 2173 ( pg_stop_backup			PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_stop_backup - _null_ ));
  DESCR("Finish taking an online backup");
+ DATA(insert OID = 2840 ( pg_switch_xlog			PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_switch_xlog - _null_ ));
+ DESCR("Switch to new xlog file segment");
+ DATA(insert OID = 2841 ( pg_xlogfile_from_wal_offset	PGNSP PGUID 12 f f t f v 1 25 "25" _null_ _null_ _null_ pg_xlogfile_from_wal_offset - _null_ ));
+ DESCR("XLog Filename, given a redo pointer");
+ DATA(insert OID = 2842 ( pg_current_xlogfile_offset		PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_current_xlogfile_offset - _null_ ));
+ DESCR("Current XLog filename and byte offset");
+ DATA(insert OID = 2843 ( pg_current_xlogfile	PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_current_xlogfile - _null_ ));
+ DESCR("Current XLog filename");
+ DATA(insert OID = 2844 ( pg_current_wal_offset	PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_current_wal_offset - _null_ ));
+ DESCR("Current WAL offset");
  
  DATA(insert OID = 2621 ( pg_reload_conf			PGNSP PGUID 12 f f t f v 0 16 "" _null_ _null_ _null_ pg_reload_conf - _null_ ));
  DESCR("Reload configuration files");
---------------------------(end of broadcast)---------------------------
TIP 4: Have you searched our list archives?

               http://archives.postgresql.org

Reply via email to