Include here a prototype patch that implements pg_switch_xlog() in line with earlier discussions about how this should be implemented.
This patch implements - separate function for manual xlog switch - internals to allow pg_stop_backup() to perform auto log switching Patch applies cleanly to cvstip, passes make check and also switches xlogs cleanly. I've not tested recovery yet, though will do so shortly --- yes, of course that is critical, before you say so. I'm shipping this very early to allow discussion, rather than test it fully and leave people wondering what it looks like. A production version will be ready prior to 8.2 code freeze. -- Simon Riggs EnterpriseDB http://www.enterprisedb.com
Index: src/backend/access/transam/xlog.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v retrieving revision 1.244 diff -c -r1.244 xlog.c *** src/backend/access/transam/xlog.c 14 Jul 2006 14:52:17 -0000 1.244 --- src/backend/access/transam/xlog.c 27 Jul 2006 12:44:23 -0000 *************** *** 452,458 **** static TimeLineID lastPageTLI = 0; static bool InRedo = false; ! static void XLogArchiveNotify(const char *xlog); static void XLogArchiveNotifySeg(uint32 log, uint32 seg); --- 452,458 ---- static TimeLineID lastPageTLI = 0; static bool InRedo = false; ! static bool haveSwitchedXLogFile = false; static void XLogArchiveNotify(const char *xlog); static void XLogArchiveNotifySeg(uint32 log, uint32 seg); *************** *** 465,471 **** static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); ! static bool AdvanceXLInsertBuffer(void); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); --- 465,471 ---- static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); ! static bool AdvanceXLInsertBuffer(bool SwitchXLog); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); *************** *** 495,500 **** --- 495,502 ---- static char *str_time(time_t tnow); static void issue_xlog_fsync(void); + static XLogRecPtr RequestXLogSwitch(void); + #ifdef WAL_DEBUG static void xlog_outrec(StringInfo buf, XLogRecord *record); #endif *************** *** 854,860 **** freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { ! updrqst = AdvanceXLInsertBuffer(); freespace = INSERT_FREESPACE(Insert); } --- 856,862 ---- freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { ! updrqst = AdvanceXLInsertBuffer(false); freespace = INSERT_FREESPACE(Insert); } *************** *** 937,943 **** } /* Use next buffer */ ! updrqst = AdvanceXLInsertBuffer(); curridx = Insert->curridx; /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; --- 939,945 ---- } /* Use next buffer */ ! updrqst = AdvanceXLInsertBuffer(false); curridx = Insert->curridx; /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; *************** *** 947,970 **** freespace = INSERT_FREESPACE(Insert); } ! /* Ensure next record will be properly aligned */ ! Insert->currpos = (char *) Insert->currpage + ! MAXALIGN(Insert->currpos - (char *) Insert->currpage); ! freespace = INSERT_FREESPACE(Insert); ! ! /* ! * The recptr I return is the beginning of the *next* record. This will be ! * stored as LSN for changed data pages... ! */ ! INSERT_RECPTR(RecPtr, Insert, curridx); ! ! /* Need to update shared LogwrtRqst if some block was filled up */ ! if (freespace < SizeOfXLogRecord) ! updrqst = true; /* curridx is filled and available for writing ! * out */ ! else ! curridx = PrevBufIdx(curridx); ! WriteRqst = XLogCtl->xlblocks[curridx]; LWLockRelease(WALInsertLock); --- 949,999 ---- freespace = INSERT_FREESPACE(Insert); } ! /* ! * We've now written out all of the xlog record and any associated blocks. ! * ! * If the xlog record was a request to perform special processing options, ! * such as an xlog switch, do this here. Otherwise, just clean up before ! * we release locks. ! */ ! if (rmid == RM_XLOG_ID && info == XLOG_SWITCH) ! { ! /* ! * Did AdvanceXLInsertBuffer() already step into a new file? ! * If so, we don't need to switch files ! */ ! if (!haveSwitchedXLogFile) ! { ! /* ! * Switch to next XLog segment file. We do this by writing out ! * the current wal buffer page, then moving the pointers forward so ! * that the next insertion point is in a new file. We *must* ! * do this with WALInsertLock held, but we want to avoid doing ! * this with WALWriteLock held, if possible. ! */ ! (void) AdvanceXLInsertBuffer(true); ! } ! updrqst = true; ! } ! ! /* Ensure next record will be properly aligned */ ! Insert->currpos = (char *) Insert->currpage + ! MAXALIGN(Insert->currpos - (char *) Insert->currpage); ! freespace = INSERT_FREESPACE(Insert); ! ! /* ! * The recptr I return is the beginning of the *next* record. This will be ! * stored as LSN for changed data pages... ! */ ! INSERT_RECPTR(RecPtr, Insert, curridx); ! ! /* Need to update shared LogwrtRqst if some block was filled up */ ! if (freespace < SizeOfXLogRecord) ! updrqst = true; /* curridx is filled and available for writing ! * out */ ! else ! curridx = PrevBufIdx(curridx); ! WriteRqst = XLogCtl->xlblocks[curridx]; LWLockRelease(WALInsertLock); *************** *** 1181,1187 **** * Must be called with WALInsertLock held. */ static bool ! AdvanceXLInsertBuffer(void) { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlWrite *Write = &XLogCtl->Write; --- 1210,1216 ---- * Must be called with WALInsertLock held. */ static bool ! AdvanceXLInsertBuffer(bool SwitchXLog) { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlWrite *Write = &XLogCtl->Write; *************** *** 1192,1197 **** --- 1221,1228 ---- XLogRecPtr NewPageEndPtr; XLogPageHeader NewPage; + haveSwitchedXLogFile = false; + /* Use Insert->LogwrtResult copy if it's more fresh */ if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write)) LogwrtResult = Insert->LogwrtResult; *************** *** 1246,1253 **** * not good, so only write as much as we absolutely must. */ WriteRqst.Write = OldPageRqstPtr; ! WriteRqst.Flush.xlogid = 0; ! WriteRqst.Flush.xrecoff = 0; XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; --- 1277,1289 ---- * not good, so only write as much as we absolutely must. */ WriteRqst.Write = OldPageRqstPtr; ! if (SwitchXLog) ! WriteRqst.Flush = OldPageRqstPtr; ! else ! { ! WriteRqst.Flush.xlogid = 0; ! WriteRqst.Flush.xrecoff = 0; ! } XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; *************** *** 1260,1265 **** --- 1296,1310 ---- * output page. */ NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx]; + + /* + * If required, reset the current offset to point to new segment file + */ + if (SwitchXLog) + NewPageEndPtr.xrecoff = NewPageEndPtr.xrecoff + - (NewPage->xlp_pageaddr.xrecoff % XLogSegSize) + + XLogSegSize; + if (NewPageEndPtr.xrecoff >= XLogFileSize) { /* crossing a logid boundary */ *************** *** 1305,1310 **** --- 1350,1358 ---- NewPage ->xlp_info |= XLP_LONG_HEADER; Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD; + + if (!SwitchXLog) + haveSwitchedXLogFile = true; } return update_needed; *************** *** 5262,5268 **** freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { ! (void) AdvanceXLInsertBuffer(); /* OK to ignore update return flag, since we will do flush anyway */ freespace = INSERT_FREESPACE(Insert); } --- 5310,5316 ---- freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { ! (void) AdvanceXLInsertBuffer(false); /* OK to ignore update return flag, since we will do flush anyway */ freespace = INSERT_FREESPACE(Insert); } *************** *** 5448,5453 **** --- 5496,5535 ---- } /* + * Writes a record to xlog to show that an XLOG_SWITCH switch has taken place. + * Immediately afterwards, while still holding WALInsertLock, XLogInsert will + * perform a physical xlog switch, as if the xlog file had been filled. + * We take care not to acquire WALWriteLock if possible, so that the + * actual flush to disk can be performed immediately afterwards, yet allowing + * further XLogInserts to occur while we fsync the old xlog file. + */ + static XLogRecPtr + RequestXLogSwitch(void) + { + XLogRecData rdata; + XLogRecPtr recptr; + + xl_xlog_switch xlrec; + + /* + * We put the time in the xlog_switch_record so that it is not a + * zero-length field, which has special meaning. Should be useful too... + */ + xlrec.xtime = time(NULL); + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) (&xlrec); + rdata.len = sizeof(xl_xlog_switch); + rdata.next = NULL; + + recptr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata); + + XLogFlush(recptr); + + return recptr; + } + + /* * XLOG resource manager's routines */ void *************** *** 5466,5471 **** --- 5548,5562 ---- ShmemVariableCache->oidCount = 0; } } + else if (info == XLOG_SWITCH) + { + /* + * Alter module-level variables controlling position of WAL-replay + */ + close(readFile); + readFile = -1; + NextLogSeg(readId, readSeg); + } else if (info == XLOG_CHECKPOINT_SHUTDOWN) { CheckPoint checkPoint; *************** *** 5544,5549 **** --- 5635,5649 ---- memcpy(&nextOid, rec, sizeof(Oid)); appendStringInfo(buf, "nextOid: %u", nextOid); } + else if (info == XLOG_SWITCH) + { + xl_xlog_switch *xlrec = (xl_xlog_switch *) rec; + struct tm *tm = localtime(&xlrec->xtime); + + appendStringInfo(buf, "xlog switch: %04u-%02u-%02u %02u:%02u:%02u", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); + } else appendStringInfo(buf, "UNKNOWN"); } *************** *** 5854,5859 **** --- 5954,5987 ---- } /* + * pg_xlog_switch: switch to next xlog file + * + */ + Datum + pg_switch_xlog(PG_FUNCTION_ARGS) + { + XLogRecPtr switchpoint; + char xlogfilename[MAXFNAMELEN]; + text *result; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to switch xlog files")))); + + switchpoint = RequestXLogSwitch(); + + /* + * We're done. As a convenience, return the WAL offset at switch + */ + snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X", + switchpoint.xlogid, switchpoint.xrecoff); + result = DatumGetTextP(DirectFunctionCall1(textin, + CStringGetDatum(xlogfilename))); + PG_RETURN_TEXT_P(result); + } + + /* * pg_stop_backup: finish taking an on-line backup dump * * We remove the backup label file created by pg_start_backup, and instead *************** *** 5885,5894 **** (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to run a backup")))); /* * Get the current end-of-WAL position; it will be unsafe to use this dump ! * to restore to a point in advance of this time. We can also clear ! * forcePageWrites here. */ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); INSERT_RECPTR(stoppoint, Insert, Insert->curridx); --- 6013,6030 ---- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to run a backup")))); + /* + * Force a switch to a new xlog segment file, so that the backup + * is valid as soon as archiver moves it out. We do this first so that the + * archiver has a chance to move the file away as soon as possible. + */ + stoppoint = RequestXLogSwitch(); + /* * Get the current end-of-WAL position; it will be unsafe to use this dump ! * to restore to a point prior to this time. We can also clear ! * forcePageWrites here. XXX could remove these lines and set ! * forcePageWrites at time of log switch */ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); INSERT_RECPTR(stoppoint, Insert, Insert->curridx); *************** *** 5983,5988 **** --- 6119,6129 ---- */ CleanupBackupHistory(); + /* + * XXX Maybe should wait here for archiver to archive the last xlog file + * and the history file? + */ + /* * We're done. As a convenience, return the ending WAL offset. */ Index: src/include/access/xlog_internal.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/xlog_internal.h,v retrieving revision 1.13 diff -c -r1.13 xlog_internal.h *** src/include/access/xlog_internal.h 5 Apr 2006 03:34:05 -0000 1.13 --- src/include/access/xlog_internal.h 27 Jul 2006 12:44:24 -0000 *************** *** 241,245 **** --- 241,246 ---- */ extern Datum pg_start_backup(PG_FUNCTION_ARGS); extern Datum pg_stop_backup(PG_FUNCTION_ARGS); + extern Datum pg_switch_xlog(PG_FUNCTION_ARGS); #endif /* XLOG_INTERNAL_H */ Index: src/include/catalog/pg_control.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_control.h,v retrieving revision 1.29 diff -c -r1.29 pg_control.h *** src/include/catalog/pg_control.h 4 Apr 2006 22:39:59 -0000 1.29 --- src/include/catalog/pg_control.h 27 Jul 2006 12:44:24 -0000 *************** *** 43,52 **** --- 43,58 ---- time_t time; /* time stamp of checkpoint */ } CheckPoint; + typedef struct xl_xlog_switch + { + time_t xtime; + } xl_xlog_switch; + /* XLOG info values for XLOG rmgr */ #define XLOG_CHECKPOINT_SHUTDOWN 0x00 #define XLOG_CHECKPOINT_ONLINE 0x10 #define XLOG_NEXTOID 0x30 + #define XLOG_SWITCH 0x40 /* System status indicator */ Index: src/include/catalog/pg_proc.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_proc.h,v retrieving revision 1.417 diff -c -r1.417 pg_proc.h *** src/include/catalog/pg_proc.h 25 Jul 2006 03:51:21 -0000 1.417 --- src/include/catalog/pg_proc.h 27 Jul 2006 12:44:29 -0000 *************** *** 3073,3078 **** --- 3073,3080 ---- DESCR("Prepare for taking an online backup"); DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_stop_backup - _null_ )); DESCR("Finish taking an online backup"); + DATA(insert OID = 2803 ( pg_switch_xlog PGNSP PGUID 12 f f t f v 0 25 "" _null_ _null_ _null_ pg_switch_xlog - _null_ )); + DESCR("Switch to new xlog file segment"); DATA(insert OID = 2621 ( pg_reload_conf PGNSP PGUID 12 f f t f v 0 16 "" _null_ _null_ _null_ pg_reload_conf - _null_ )); DESCR("Reload configuration files");
---------------------------(end of broadcast)--------------------------- TIP 4: Have you searched our list archives? http://archives.postgresql.org