On Marko Kreen's detailed suggestion, I've implemented a restartable
recovery mode for archive recovery (aka PITR). Restart points are known
as recovery checkpoints and are normally taken every 100 checkpoints in
the log to ensure good recovery performance.

An additional mode
        standby_mode = 'true'
can also be specified, which ensures that a recovery checkpoint occurs
for each checkpoint in the logs.

Some other code refactorings, though all changes isolated to xlog.c and
to pg_control.h; code comments welcome.

Applies cleanly to cvstip, passes make check.

Further details testing is very desirable. I've tested restarting a
recovery twice and things work successfully. 

-- 
  Simon Riggs
  EnterpriseDB          http://www.enterprisedb.com
Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.242
diff -c -r1.242 xlog.c
*** src/backend/access/transam/xlog.c	27 Jun 2006 18:59:17 -0000	1.242
--- src/backend/access/transam/xlog.c	11 Jul 2006 16:46:21 -0000
***************
*** 124,129 ****
--- 124,130 ----
  
  /* File path names (all relative to $PGDATA) */
  #define BACKUP_LABEL_FILE		"backup_label"
+ #define BACKUP_LABEL_IN_USE	    "backup_label.in_use"
  #define RECOVERY_COMMAND_FILE	"recovery.conf"
  #define RECOVERY_COMMAND_DONE	"recovery.done"
  
***************
*** 183,188 ****
--- 184,192 ----
  static bool recoveryTargetInclusive = true;
  static TransactionId recoveryTargetXid;
  static time_t recoveryTargetTime;
+ static bool InStandby = false;
+ /* How many XLOG_CHECKPOINT* entries since last recovery checkpoint */
+ static int nCheckpoints = 0;    
  
  /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
  static TransactionId recoveryStopXid;
***************
*** 496,501 ****
--- 500,506 ----
  					 uint32 endLogId, uint32 endLogSeg);
  static void WriteControlFile(void);
  static void ReadControlFile(void);
+ static void ValidateControlFile(void);
  static char *str_time(time_t tnow);
  static void issue_xlog_fsync(void);
  
***************
*** 505,511 ****
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  static void rm_redo_error_callback(void *arg);
! 
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
--- 510,516 ----
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  static void rm_redo_error_callback(void *arg);
! static void CheckPointShmem(XLogRecPtr checkPointRedo);
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 3626,3631 ****
--- 3631,3663 ----
  		ereport(FATAL,
  				(errmsg("incorrect checksum in control file")));
  
+     ValidateControlFile();
+ 
+ 	if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
+ 		ereport(FATAL,
+ 			(errmsg("database files are incompatible with operating system"),
+ 			 errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
+ 					   " which is not recognized by setlocale().",
+ 					   ControlFile->lc_collate),
+ 			 errhint("It looks like you need to initdb or install locale support.")));
+ 	if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
+ 		ereport(FATAL,
+ 			(errmsg("database files are incompatible with operating system"),
+ 		errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
+ 				  " which is not recognized by setlocale().",
+ 				  ControlFile->lc_ctype),
+ 			 errhint("It looks like you need to initdb or install locale support.")));
+ 
+ 	/* Make the fixed locale settings visible as GUC variables, too */
+ 	SetConfigOption("lc_collate", ControlFile->lc_collate,
+ 					PGC_INTERNAL, PGC_S_OVERRIDE);
+ 	SetConfigOption("lc_ctype", ControlFile->lc_ctype,
+ 					PGC_INTERNAL, PGC_S_OVERRIDE);
+ }
+ 
+ static void
+ ValidateControlFile(void)
+ {
  	/*
  	 * Do compatibility checking immediately.  We do this here for 2 reasons:
  	 *
***************
*** 3722,3747 ****
  				  " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
  						   ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
  				 errhint("It looks like you need to recompile or initdb.")));
- 	if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
- 		ereport(FATAL,
- 			(errmsg("database files are incompatible with operating system"),
- 			 errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
- 					   " which is not recognized by setlocale().",
- 					   ControlFile->lc_collate),
- 			 errhint("It looks like you need to initdb or install locale support.")));
- 	if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
- 		ereport(FATAL,
- 			(errmsg("database files are incompatible with operating system"),
- 		errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
- 				  " which is not recognized by setlocale().",
- 				  ControlFile->lc_ctype),
- 			 errhint("It looks like you need to initdb or install locale support.")));
- 
- 	/* Make the fixed locale settings visible as GUC variables, too */
- 	SetConfigOption("lc_collate", ControlFile->lc_collate,
- 					PGC_INTERNAL, PGC_S_OVERRIDE);
- 	SetConfigOption("lc_ctype", ControlFile->lc_ctype,
- 					PGC_INTERNAL, PGC_S_OVERRIDE);
  }
  
  void
--- 3754,3759 ----
***************
*** 3749,3754 ****
--- 3761,3768 ----
  {
  	int			fd;
  
+     ValidateControlFile();
+ 
  	INIT_CRC32(ControlFile->crc);
  	COMP_CRC32(ControlFile->crc,
  			   (char *) ControlFile,
***************
*** 4095,4100 ****
--- 4109,4123 ----
  					(errmsg("restore_command = \"%s\"",
  							recoveryRestoreCommand)));
  		}
+ 		else if (strcmp(tok1, "standby_mode") == 0)
+ 		{
+ 			if (strcmp(tok2, "true") == 0)
+             {
+                 InStandby = true;
+ 				ereport(LOG,
+ 						(errmsg("standby_mode = true")));
+             }
+         }
  		else if (strcmp(tok1, "recovery_target_timeline") == 0)
  		{
  			rtliGiven = true;
***************
*** 4230,4235 ****
--- 4253,4259 ----
  	 * We are no longer in archive recovery state.
  	 */
  	InArchiveRecovery = false;
+ 	InStandby = false;
  
  	/*
  	 * We should have the ending log segment currently open.  Verify, and then
***************
*** 4465,4476 ****
  		ereport(LOG,
  				(errmsg("database system shutdown was interrupted at %s",
  						str_time(ControlFile->time))));
! 	else if (ControlFile->state == DB_IN_RECOVERY)
  		ereport(LOG,
  		   (errmsg("database system was interrupted while in recovery at %s",
  				   str_time(ControlFile->time)),
  			errhint("This probably means that some data is corrupted and"
  					" you will have to use the last backup for recovery.")));
  	else if (ControlFile->state == DB_IN_PRODUCTION)
  		ereport(LOG,
  				(errmsg("database system was interrupted at %s",
--- 4489,4506 ----
  		ereport(LOG,
  				(errmsg("database system shutdown was interrupted at %s",
  						str_time(ControlFile->time))));
! 	else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
  		ereport(LOG,
  		   (errmsg("database system was interrupted while in recovery at %s",
  				   str_time(ControlFile->time)),
  			errhint("This probably means that some data is corrupted and"
  					" you will have to use the last backup for recovery.")));
+ 	else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
+ 		ereport(LOG,
+ 		   (errmsg("database system was interrupted while in recovery at log time %s",
+ 				   str_time(ControlFile->time)),
+ 			errhint("If this has occurred more than once some data may be corrupted"
+ 					" and you may need to choose an earlier recovery target.")));
  	else if (ControlFile->state == DB_IN_PRODUCTION)
  		ereport(LOG,
  				(errmsg("database system was interrupted at %s",
***************
*** 4626,4641 ****
  	{
  		int			rmid;
  
  		if (InArchiveRecovery)
! 			ereport(LOG,
  					(errmsg("automatic recovery in progress")));
  		else
  			ereport(LOG,
  					(errmsg("database system was not properly shut down; "
  							"automatic recovery in progress")));
! 		ControlFile->state = DB_IN_RECOVERY;
  		ControlFile->time = time(NULL);
  		UpdateControlFile();
  
  		/* Start up the recovery environment */
  		XLogInitRelationCache();
--- 4656,4685 ----
  	{
  		int			rmid;
  
+         /*
+          * If we are in Archive Recovery then we create recovery checkpoints
+          * to avoid needing to start right from the beginning again. 
+          */
+     	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
  		if (InArchiveRecovery)
!         {		
!         	ereport(LOG,
  					(errmsg("automatic recovery in progress")));
+     		ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+         }
  		else
+         {
  			ereport(LOG,
  					(errmsg("database system was not properly shut down; "
  							"automatic recovery in progress")));
!     		ControlFile->state = DB_IN_CRASH_RECOVERY;
!         }
  		ControlFile->time = time(NULL);
+     	ControlFile->prevCheckPoint = ControlFile->checkPoint;
+     	ControlFile->checkPoint = checkPointLoc;
+     	ControlFile->checkPointCopy = checkPoint;
  		UpdateControlFile();
+     	LWLockRelease(ControlFileLock);
  
  		/* Start up the recovery environment */
  		XLogInitRelationCache();
***************
*** 4668,4673 ****
--- 4712,4719 ----
  			ErrorContextCallback	errcontext;
  
  			InRedo = true;
+             nCheckpoints = 0;
+ 
  			ereport(LOG,
  					(errmsg("redo starts at %X/%X",
  							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
***************
*** 5334,5345 ****
  		ereport(DEBUG2,
  				(errmsg("checkpoint starting")));
  
! 	CheckPointCLOG();
! 	CheckPointSUBTRANS();
! 	CheckPointMultiXact();
! 	FlushBufferPool();
! 	/* We deliberately delay 2PC checkpointing as long as possible */
! 	CheckPointTwoPhase(checkPoint.redo);
  
  	START_CRIT_SECTION();
  
--- 5380,5389 ----
  		ereport(DEBUG2,
  				(errmsg("checkpoint starting")));
  
!     /*
!      * Ensure all of shared memory gets checkpointed
!      */
!     CheckPointShmem(checkPoint.redo);
  
  	START_CRIT_SECTION();
  
***************
*** 5458,5463 ****
--- 5502,5508 ----
  xlog_redo(XLogRecPtr lsn, XLogRecord *record)
  {
  	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+ 	CheckPoint	checkPoint;
  
  	if (info == XLOG_NEXTOID)
  	{
***************
*** 5469,5479 ****
  			ShmemVariableCache->nextOid = nextOid;
  			ShmemVariableCache->oidCount = 0;
  		}
  	}
  	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
  	{
- 		CheckPoint	checkPoint;
- 
  		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  		/* In a SHUTDOWN checkpoint, believe the counters exactly */
  		ShmemVariableCache->nextXid = checkPoint.nextXid;
--- 5514,5523 ----
  			ShmemVariableCache->nextOid = nextOid;
  			ShmemVariableCache->oidCount = 0;
  		}
+         return;
  	}
  	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
  	{
  		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  		/* In a SHUTDOWN checkpoint, believe the counters exactly */
  		ShmemVariableCache->nextXid = checkPoint.nextXid;
***************
*** 5499,5506 ****
  	}
  	else if (info == XLOG_CHECKPOINT_ONLINE)
  	{
- 		CheckPoint	checkPoint;
- 
  		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
  		/* In an ONLINE checkpoint, treat the counters like NEXTOID */
  		if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
--- 5543,5548 ----
***************
*** 5519,5524 ****
--- 5561,5609 ----
  					(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
  							checkPoint.ThisTimeLineID, ThisTimeLineID)));
  	}
+ 
+ #define RECOVERY_CHECKPOINT_INTERVAL 100
+ 
+     /*
+      * If we are in Standby mode, then do a recovery checkpoint 
+      * for each checkpoint found in WAL replay. Otherwise,
+      * don't do this very frequently since this slows down recovery.
+      * A recovery checkpoint is simply a recreation of the database
+      * state after the original checkpoint: all database changes
+      * are written to disk, allowing us to restart recovery from that
+      * point. 
+      *
+      * Note: Should recovery ever be parallelised in the future,
+      * all work *must* stop until the recovery checkpoint has
+      * completed.
+      */
+     if (InArchiveRecovery && (InStandby || nCheckpoints >= RECOVERY_CHECKPOINT_INTERVAL))
+     {
+         CheckPointShmem(lsn);
+ 
+     	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+    		ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+     	ControlFile->prevCheckPoint = ControlFile->checkPoint;
+         /* 
+          * The checkpoint record starts at ReadRecPtr; lsn is pointer to
+          * the next xlog record so must not be used here
+          */
+     	ControlFile->checkPoint = ReadRecPtr;
+     	ControlFile->checkPointCopy = checkPoint;
+         /* 
+          * Make it look like we started from this point, so this is *not*
+          * current time but original checkpoint time 
+          */
+     	ControlFile->time = checkPoint.time;
+     	UpdateControlFile();
+     	LWLockRelease(ControlFileLock);
+ 		ereport(LOG,
+ 				(errmsg("recovery checkpoint at %X/%X",
+ 						lsn.xlogid, lsn.xrecoff)));
+         nCheckpoints = 0;
+     }
+     else
+         nCheckpoints++;
  }
  
  void
***************
*** 6106,6111 ****
--- 6191,6207 ----
  							histfilepath)));
  	}
  
+ 	/*
+ 	 * Rename the backup label file out of the way, so that we don't accidentally
+ 	 * re-start recovery from the beginning.
+ 	 */
+ 	unlink(BACKUP_LABEL_IN_USE);
+ 	if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_IN_USE) != 0)
+ 		ereport(FATAL,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
+ 						BACKUP_LABEL_FILE, BACKUP_LABEL_IN_USE)));
+ 
  	return true;
  }
  
***************
*** 6119,6130 ****
  static void
  remove_backup_label(void)
  {
! 	if (unlink(BACKUP_LABEL_FILE) != 0)
! 		if (errno != ENOENT)
! 			ereport(FATAL,
! 					(errcode_for_file_access(),
! 					 errmsg("could not remove file \"%s\": %m",
! 							BACKUP_LABEL_FILE)));
  }
  
  /*
--- 6215,6226 ----
  static void
  remove_backup_label(void)
  {
!     if (unlink(BACKUP_LABEL_IN_USE) != 0)
!         if (errno != ENOENT)
!             ereport(FATAL,
!                     (errcode_for_file_access(),
!                     errmsg("could not remove file \"%s\": %m",
!                                     BACKUP_LABEL_IN_USE)));
  }
  
  /*
***************
*** 6147,6149 ****
--- 6243,6258 ----
  
  	pfree(buf.data);
  }
+ 
+ /* 
+  * Flush all shared memory data zones and ensure fsync
+  */
+ static void CheckPointShmem(XLogRecPtr checkPointRedo)
+ {
+ 	CheckPointCLOG();
+ 	CheckPointSUBTRANS();
+ 	CheckPointMultiXact();
+ 	FlushBufferPool();     /* performs all required fsyncs */
+ 	/* We deliberately delay 2PC checkpointing as long as possible */
+ 	CheckPointTwoPhase(checkPointRedo);
+ }
Index: src/include/catalog/pg_control.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/catalog/pg_control.h,v
retrieving revision 1.29
diff -c -r1.29 pg_control.h
*** src/include/catalog/pg_control.h	4 Apr 2006 22:39:59 -0000	1.29
--- src/include/catalog/pg_control.h	11 Jul 2006 16:46:24 -0000
***************
*** 55,61 ****
  	DB_STARTUP = 0,
  	DB_SHUTDOWNED,
  	DB_SHUTDOWNING,
! 	DB_IN_RECOVERY,
  	DB_IN_PRODUCTION
  } DBState;
  
--- 55,62 ----
  	DB_STARTUP = 0,
  	DB_SHUTDOWNED,
  	DB_SHUTDOWNING,
! 	DB_IN_CRASH_RECOVERY,
! 	DB_IN_ARCHIVE_RECOVERY,
  	DB_IN_PRODUCTION
  } DBState;
  
---------------------------(end of broadcast)---------------------------
TIP 9: In versions below 8.0, the planner will ignore your desire to
       choose an index scan if your joining column's datatypes do not
       match

Reply via email to