Include here a prototype patch that implements pg_switch_xlog() in line
with earlier discussions about how this should be implemented.
This patch implements
- separate function for manual xlog switch
- internals to allow pg_stop_backup() to perform auto log switching
Patch applies cleanly to cvstip, passes make check and also switches
xlogs cleanly. I've not tested recovery yet, though will do so shortly
--- yes, of course that is critical, before you say so.
I'm shipping this very early to allow discussion, rather than test it
fully and leave people wondering what it looks like.
A production version will be ready prior to 8.2 code freeze.
--
Simon Riggs
EnterpriseDB http://www.enterprisedb.com
Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.244
diff -c -r1.244 xlog.c
*** src/backend/access/transam/xlog.c 14 Jul 2006 14:52:17 -0000 1.244
--- src/backend/access/transam/xlog.c 27 Jul 2006 12:44:23 -0000
***************
*** 452,458 ****
static TimeLineID lastPageTLI = 0;
static bool InRedo = false;
!
static void XLogArchiveNotify(const char *xlog);
static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
--- 452,458 ----
static TimeLineID lastPageTLI = 0;
static bool InRedo = false;
! static bool haveSwitchedXLogFile = false;
static void XLogArchiveNotify(const char *xlog);
static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
***************
*** 465,471 ****
static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
--- 465,471 ----
static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(bool SwitchXLog);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
***************
*** 495,500 ****
--- 495,502 ----
static char *str_time(time_t tnow);
static void issue_xlog_fsync(void);
+ static XLogRecPtr RequestXLogSwitch(void);
+
#ifdef WAL_DEBUG
static void xlog_outrec(StringInfo buf, XLogRecord *record);
#endif
***************
*** 854,860 ****
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
! updrqst = AdvanceXLInsertBuffer();
freespace = INSERT_FREESPACE(Insert);
}
--- 856,862 ----
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
! updrqst = AdvanceXLInsertBuffer(false);
freespace = INSERT_FREESPACE(Insert);
}
***************
*** 937,943 ****
}
/* Use next buffer */
! updrqst = AdvanceXLInsertBuffer();
curridx = Insert->curridx;
/* Insert cont-record header */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
--- 939,945 ----
}
/* Use next buffer */
! updrqst = AdvanceXLInsertBuffer(false);
curridx = Insert->curridx;
/* Insert cont-record header */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
***************
*** 947,970 ****
freespace = INSERT_FREESPACE(Insert);
}
! /* Ensure next record will be properly aligned */
! Insert->currpos = (char *) Insert->currpage +
! MAXALIGN(Insert->currpos - (char *) Insert->currpage);
! freespace = INSERT_FREESPACE(Insert);
!
! /*
! * The recptr I return is the beginning of the *next* record. This will be
! * stored as LSN for changed data pages...
! */
! INSERT_RECPTR(RecPtr, Insert, curridx);
!
! /* Need to update shared LogwrtRqst if some block was filled up */
! if (freespace < SizeOfXLogRecord)
! updrqst = true; /* curridx is filled and available for writing
! * out */
! else
! curridx = PrevBufIdx(curridx);
! WriteRqst = XLogCtl->xlblocks[curridx];
LWLockRelease(WALInsertLock);
--- 949,999 ----
freespace = INSERT_FREESPACE(Insert);
}
! /*
! * We've now written out all of the xlog record and any associated blocks.
! *
! * If the xlog record was a request to perform special processing options,
! * such as an xlog switch, do this here. Otherwise, just clean up before
! * we release locks.
! */
! if (rmid == RM_XLOG_ID && info == XLOG_SWITCH)
! {
! /*
! * Did AdvanceXLInsertBuffer() already step into a new file?
! * If so, we don't need to switch files
! */
! if (!haveSwitchedXLogFile)
! {
! /*
! * Switch to next XLog segment file. We do this by writing out
! * the current wal buffer page, then moving the pointers forward so
! * that the next insertion point is in a new file. We *must*
! * do this with WALInsertLock held, but we want to avoid doing
! * this with WALWriteLock held, if possible.
! */
! (void) AdvanceXLInsertBuffer(true);
! }
! updrqst = true;
! }
!
! /* Ensure next record will be properly aligned */
! Insert->currpos = (char *) Insert->currpage +
! MAXALIGN(Insert->currpos - (char *) Insert->currpage);
! freespace = INSERT_FREESPACE(Insert);
!
! /*
! * The recptr I return is the beginning of the *next* record. This will be
! * stored as LSN for changed data pages...
! */
! INSERT_RECPTR(RecPtr, Insert, curridx);
!
! /* Need to update shared LogwrtRqst if some block was filled up */
! if (freespace < SizeOfXLogRecord)
! updrqst = true; /* curridx is filled and available for writing
! * out */
! else
! curridx = PrevBufIdx(curridx);
! WriteRqst = XLogCtl->xlblocks[curridx];
LWLockRelease(WALInsertLock);
***************
*** 1181,1187 ****
* Must be called with WALInsertLock held.
*/
static bool
! AdvanceXLInsertBuffer(void)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogCtlWrite *Write = &XLogCtl->Write;
--- 1210,1216 ----
* Must be called with WALInsertLock held.
*/
static bool
! AdvanceXLInsertBuffer(bool SwitchXLog)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogCtlWrite *Write = &XLogCtl->Write;
***************
*** 1192,1197 ****
--- 1221,1228 ----
XLogRecPtr NewPageEndPtr;
XLogPageHeader NewPage;
+ haveSwitchedXLogFile = false;
+
/* Use Insert->LogwrtResult copy if it's more fresh */
if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
LogwrtResult = Insert->LogwrtResult;
***************
*** 1246,1253 ****
* not good, so only write as much as we absolutely must.
*/
WriteRqst.Write = OldPageRqstPtr;
! WriteRqst.Flush.xlogid = 0;
! WriteRqst.Flush.xrecoff = 0;
XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
--- 1277,1289 ----
* not good, so only write as much as we absolutely must.
*/
WriteRqst.Write = OldPageRqstPtr;
! if (SwitchXLog)
! WriteRqst.Flush = OldPageRqstPtr;
! else
! {
! WriteRqst.Flush.xlogid = 0;
! WriteRqst.Flush.xrecoff = 0;
! }
XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
***************
*** 1260,1265 ****
--- 1296,1310 ----
* output page.
*/
NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+
+ /*
+ * If required, reset the current offset to point to new segment file
+ */
+ if (SwitchXLog)
+ NewPageEndPtr.xrecoff = NewPageEndPtr.xrecoff
+ - (NewPage->xlp_pageaddr.xrecoff % XLogSegSize)
+ + XLogSegSize;
+
if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
/* crossing a logid boundary */
***************
*** 1305,1310 ****
--- 1350,1358 ----
NewPage ->xlp_info |= XLP_LONG_HEADER;
Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+
+ if (!SwitchXLog)
+ haveSwitchedXLogFile = true;
}
return update_needed;
***************
*** 5262,5268 ****
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
! (void) AdvanceXLInsertBuffer();
/* OK to ignore update return flag, since we will do flush anyway */
freespace = INSERT_FREESPACE(Insert);
}
--- 5310,5316 ----
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
! (void) AdvanceXLInsertBuffer(false);
/* OK to ignore update return flag, since we will do flush anyway */
freespace = INSERT_FREESPACE(Insert);
}
***************
*** 5448,5453 ****
--- 5496,5535 ----
}
/*
+ * Writes a record to xlog to show that an XLOG_SWITCH switch has taken place.
+ * Immediately afterwards, while still holding WALInsertLock, XLogInsert will
+ * perform a physical xlog switch, as if the xlog file had been filled.
+ * We take care not to acquire WALWriteLock if possible, so that the
+ * actual flush to disk can be performed immediately afterwards, yet allowing
+ * further XLogInserts to occur while we fsync the old xlog file.
+ */
+ static XLogRecPtr
+ RequestXLogSwitch(void)
+ {
+ XLogRecData rdata;
+ XLogRecPtr recptr;
+
+ xl_xlog_switch xlrec;
+
+ /*
+ * We put the time in the xlog_switch_record so that it is not a
+ * zero-length field, which has special meaning. Should be useful too...
+ */
+ xlrec.xtime = time(NULL);
+
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *) (&xlrec);
+ rdata.len = sizeof(xl_xlog_switch);
+ rdata.next = NULL;
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
+
+ XLogFlush(recptr);
+
+ return recptr;
+ }
+
+ /*
* XLOG resource manager's routines
*/
void
***************
*** 5466,5471 ****
--- 5548,5562 ----
ShmemVariableCache->oidCount = 0;
}
}
+ else if (info == XLOG_SWITCH)
+ {
+ /*
+ * Alter module-level variables controlling position of WAL-replay
+ */
+ close(readFile);
+ readFile = -1;
+ NextLogSeg(readId, readSeg);
+ }
else if (info == XLOG_CHECKPOINT_SHUTDOWN)
{
CheckPoint checkPoint;
***************
*** 5544,5549 ****
--- 5635,5649 ----
memcpy(&nextOid, rec, sizeof(Oid));
appendStringInfo(buf, "nextOid: %u", nextOid);
}
+ else if (info == XLOG_SWITCH)
+ {
+ xl_xlog_switch *xlrec = (xl_xlog_switch *) rec;
+ struct tm *tm = localtime(&xlrec->xtime);
+
+ appendStringInfo(buf, "xlog switch: %04u-%02u-%02u %02u:%02u:%02u",
+ tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
+ tm->tm_hour, tm->tm_min, tm->tm_sec);
+ }
else
appendStringInfo(buf, "UNKNOWN");
}
***************
*** 5854,5859 ****
--- 5954,5987 ----
}
/*
+ * pg_xlog_switch: switch to next xlog file
+ *
+ */
+ Datum
+ pg_switch_xlog(PG_FUNCTION_ARGS)
+ {
+ XLogRecPtr switchpoint;
+ char xlogfilename[MAXFNAMELEN];
+ text *result;
+
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser to switch xlog files"))));
+
+ switchpoint = RequestXLogSwitch();
+
+ /*
+ * We're done. As a convenience, return the WAL offset at switch
+ */
+ snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
+ switchpoint.xlogid, switchpoint.xrecoff);
+ result = DatumGetTextP(DirectFunctionCall1(textin,
+ CStringGetDatum(xlogfilename)));
+ PG_RETURN_TEXT_P(result);
+ }
+
+ /*
* pg_stop_backup: finish taking an on-line backup dump
*
* We remove the backup label file created by pg_start_backup, and instead
***************
*** 5885,5894 ****
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to run a backup"))));
/*
* Get the current end-of-WAL position; it will be unsafe to use this dump
! * to restore to a point in advance of this time. We can also clear
! * forcePageWrites here.
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
--- 6013,6030 ----
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to run a backup"))));
+ /*
+ * Force a switch to a new xlog segment file, so that the backup
+ * is valid as soon as archiver moves it out. We do this first so that the
+ * archiver has a chance to move the file away as soon as possible.
+ */
+ stoppoint = RequestXLogSwitch();
+
/*
* Get the current end-of-WAL position; it will be unsafe to use this dump
! * to restore to a point prior to this time. We can also clear
! * forcePageWrites here. XXX could remove these lines and set
! * forcePageWrites at time of log switch
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
***************
*** 5983,5988 ****
--- 6119,6129 ----
*/
CleanupBackupHistory();
+ /*
+ * XXX Maybe should wait here for archiver to archive the last xlog file
+ * and the history file?
+ */
+
/*
* We're done. As a convenience, return the ending WAL offset.
*/
Index: src/include/access/xlog_internal.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/access/xlog_internal.h,v
retrieving revision 1.13
diff -c -r1.13 xlog_internal.h
*** src/include/access/xlog_internal.h 5 Apr 2006 03:34:05 -0000 1.13
--- src/include/access/xlog_internal.h 27 Jul 2006 12:44:24 -0000
***************
*** 241,245 ****
--- 241,246 ----
*/
extern Datum pg_start_backup(PG_FUNCTION_ARGS);
extern Datum pg_stop_backup(PG_FUNCTION_ARGS);
+ extern Datum pg_switch_xlog(PG_FUNCTION_ARGS);
#endif /* XLOG_INTERNAL_H */
Index: src/include/catalog/pg_control.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_control.h,v
retrieving revision 1.29
diff -c -r1.29 pg_control.h
*** src/include/catalog/pg_control.h 4 Apr 2006 22:39:59 -0000 1.29
--- src/include/catalog/pg_control.h 27 Jul 2006 12:44:24 -0000
***************
*** 43,52 ****
--- 43,58 ----
time_t time; /* time stamp of checkpoint */
} CheckPoint;
+ typedef struct xl_xlog_switch
+ {
+ time_t xtime;
+ } xl_xlog_switch;
+
/* XLOG info values for XLOG rmgr */
#define XLOG_CHECKPOINT_SHUTDOWN 0x00
#define XLOG_CHECKPOINT_ONLINE 0x10
#define XLOG_NEXTOID 0x30
+ #define XLOG_SWITCH 0x40
/* System status indicator */
Index: src/include/catalog/pg_proc.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_proc.h,v
retrieving revision 1.417
diff -c -r1.417 pg_proc.h
*** src/include/catalog/pg_proc.h 25 Jul 2006 03:51:21 -0000 1.417
--- src/include/catalog/pg_proc.h 27 Jul 2006 12:44:29 -0000
***************
*** 3073,3078 ****
--- 3073,3080 ----
DESCR("Prepare for taking an online backup");
DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_stop_backup - _null_ ));
DESCR("Finish taking an online backup");
+ DATA(insert OID = 2803 ( pg_switch_xlog PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_switch_xlog - _null_ ));
+ DESCR("Switch to new xlog file segment");
DATA(insert OID = 2621 ( pg_reload_conf PGNSP PGUID 12 f f t f v 0 16 "" _null_ _null_ _null_ pg_reload_conf - _null_ ));
DESCR("Reload configuration files");
---------------------------(end of broadcast)---------------------------
TIP 4: Have you searched our list archives?
http://archives.postgresql.org