On Fri, Oct 28, 2011 at 08:46, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <mag...@hagander.net> wrote:
>> Here's a version that does this. Turns out this requires a lot less
>> code than what was previously in there, which is always nice.
>>
>> We still need to solve the other part which is how to deal with the
>> partial files on restore. But this is definitely a cleaner way from a
>> pure pg_receivexlog perspective.
>>
>> Comments/reviews?
>
> Looks good.
>
> Minor comment:
> the source code comment of FindStreamingStart() seems to need to be updated.

Here's an updated patch that both includes this update to the comment,
and also the functionality to pre-pad files to 16Mb. This also seems
to have simplified the code, which is a nice bonus.

-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 71,104 **** usage(void)
  static bool
  segment_callback(XLogRecPtr segendpos, uint32 timeline)
  {
- 	char		fn[MAXPGPATH];
- 	struct stat statbuf;
- 
  	if (verbose)
  		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
  				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
  
  	/*
- 	 * Check if there is a partial file for the name we just finished, and if
- 	 * there is, remove it under the assumption that we have now got all the
- 	 * data we need.
- 	 */
- 	segendpos.xrecoff /= XLOG_SEG_SIZE;
- 	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
- 	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
- 			 basedir, timeline,
- 			 segendpos.xlogid,
- 			 segendpos.xrecoff);
- 	if (stat(fn, &statbuf) == 0)
- 	{
- 		/* File existed, get rid of it */
- 		if (verbose)
- 			fprintf(stderr, _("%s: removing file \"%s\"\n"),
- 					progname, fn);
- 		unlink(fn);
- 	}
- 
- 	/*
  	 * Never abort from this - we handle all aborting in continue_streaming()
  	 */
  	return false;
--- 71,81 ----
***************
*** 119,127 **** continue_streaming(void)
  /*
   * Determine starting location for streaming, based on:
   * 1. If there are existing xlog segments, start at the end of the last one
!  * 2. If the last one is a partial segment, rename it and start over, since
!  *	  we don't sync after every write.
!  * 3. If no existing xlog exists, start from the beginning of the current
   *	  WAL segment.
   */
  static XLogRecPtr
--- 96,103 ----
  /*
   * Determine starting location for streaming, based on:
   * 1. If there are existing xlog segments, start at the end of the last one
!  *    that is complete (size matches XLogSegSize)
!  * 2. If no valid xlog exists, start from the beginning of the current
   *	  WAL segment.
   */
  static XLogRecPtr
***************
*** 133,139 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  	bool		b;
  	uint32		high_log = 0;
  	uint32		high_seg = 0;
- 	bool		partial = false;
  
  	dir = opendir(basedir);
  	if (dir == NULL)
--- 109,114 ----
***************
*** 195,201 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  			disconnect_and_exit(1);
  		}
  
! 		if (statbuf.st_size == 16 * 1024 * 1024)
  		{
  			/* Completed segment */
  			if (log > high_log ||
--- 170,176 ----
  			disconnect_and_exit(1);
  		}
  
! 		if (statbuf.st_size == XLOG_SEG_SIZE)
  		{
  			/* Completed segment */
  			if (log > high_log ||
***************
*** 208,244 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  		}
  		else
  		{
! 			/*
! 			 * This is a partial file. Rename it out of the way.
! 			 */
! 			char		newfn[MAXPGPATH];
! 
! 			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
! 					progname, dirent->d_name, dirent->d_name);
! 
! 			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
! 					 basedir, dirent->d_name);
! 
! 			if (stat(newfn, &statbuf) == 0)
! 			{
! 				/*
! 				 * XXX: perhaps we should only error out if the existing file
! 				 * is larger?
! 				 */
! 				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
! 						progname, newfn);
! 				disconnect_and_exit(1);
! 			}
! 			if (rename(fullpath, newfn) != 0)
! 			{
! 				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
! 						progname, fullpath, newfn, strerror(errno));
! 				disconnect_and_exit(1);
! 			}
! 
! 			/* Don't continue looking for more, we assume this is the last */
! 			partial = true;
! 			break;
  		}
  	}
  
--- 183,191 ----
  		}
  		else
  		{
! 			fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"),
! 					progname, dirent->d_name, (int) statbuf.st_size);
! 			continue;
  		}
  	}
  
***************
*** 247,263 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  	if (high_log > 0 || high_seg > 0)
  	{
  		XLogRecPtr	high_ptr;
! 
! 		if (!partial)
! 		{
! 			/*
! 			 * If the segment was partial, the pointer is already at the right
! 			 * location since we want to re-transmit that segment. If it was
! 			 * not, we need to move it to the next segment, since we are
! 			 * tracking the last one that was complete.
! 			 */
! 			NextLogSeg(high_log, high_seg);
! 		}
  
  		high_ptr.xlogid = high_log;
  		high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
--- 194,204 ----
  	if (high_log > 0 || high_seg > 0)
  	{
  		XLogRecPtr	high_ptr;
! 		/*
! 		 * Move the starting pointer to the start of the next segment,
! 		 * since the highest one we've seen was completed.
! 		 */
! 		NextLogSeg(high_log, high_seg);
  
  		high_ptr.xlogid = high_log;
  		high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "receivelog.h"
  #include "streamutil.h"
  
+ #include <sys/stat.h>
  #include <sys/time.h>
  #include <sys/types.h>
  #include <unistd.h>
***************
*** 41,64 **** const XLogRecPtr InvalidXLogRecPtr = {0, 0};
   * Open a new WAL file in the specified directory. Store the name
   * (not including the full directory) in namebuf. Assumes there is
   * enough room in this buffer...
   */
  static int
  open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
  {
  	int			f;
  	char		fn[MAXPGPATH];
  
  	XLogFileName(namebuf, timeline, startpoint.xlogid,
  				 startpoint.xrecoff / XLOG_SEG_SIZE);
  
! 	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
! 	f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
  	if (f == -1)
  		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! 				progname, namebuf, strerror(errno));
  	return f;
  }
  
  /*
   * Local version of GetCurrentTimestamp(), since we are not linked with
   * backend code.
--- 42,163 ----
   * Open a new WAL file in the specified directory. Store the name
   * (not including the full directory) in namebuf. Assumes there is
   * enough room in this buffer...
+  *
+  * The file will be padded to 16Mb with zeroes.
   */
  static int
  open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
  {
  	int			f;
  	char		fn[MAXPGPATH];
+ 	struct stat	statbuf;
+ 	char	   *zerobuf;
+ 	int			bytes;
  
  	XLogFileName(namebuf, timeline, startpoint.xlogid,
  				 startpoint.xrecoff / XLOG_SEG_SIZE);
  
! 	snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
! 	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666);
  	if (f == -1)
  		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 
! 	/*
! 	 * Verify that the file is either empty (just created), or a complete
! 	 * XLogSegSize segment. Anything in between indicates a corrupt file.
! 	 */
! 	if (fstat(f, &statbuf) != 0)
! 	{
! 		fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 		close(f);
! 		return -1;
! 	}
! 	if (statbuf.st_size == XLogSegSize)
! 		return f; /* File is open and ready to use */
! 	if (statbuf.st_size != 0)
! 	{
! 		fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
! 				progname, fn, (int) statbuf.st_size, XLogSegSize);
! 		close(f);
! 		return -1;
! 	}
! 
! 	/* New, empty, file. So pad it to 16Mb with zeroes */
! 	zerobuf = xmalloc0(XLOG_BLCKSZ);
! 	for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
! 	{
! 		if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
! 		{
! 			fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
! 					progname, fn, strerror(errno));
! 			close(f);
! 			return -1;
! 		}
! 	}
! 	if (lseek(f, SEEK_SET, 0) != 0)
! 	{
! 		fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 		close(f);
! 		return -1;
! 	}
  	return f;
  }
  
+ static bool
+ close_walfile(int walfile, char *basedir, char *walname)
+ {
+ 	off_t		currpos = lseek(walfile, 0, SEEK_CUR);
+ 
+ 	if (currpos == -1)
+ 	{
+ 		fprintf(stderr, _("%s: could not get current position in file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	if (fsync(walfile) != 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	if (close(walfile) != 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	/*
+ 	 * Rename the .partial file only if we've completed writing the
+ 	 * whole segment.
+ 	 */
+ 	if (currpos == XLOG_SEG_SIZE)
+ 	{
+ 		char		oldfn[MAXPGPATH];
+ 		char		newfn[MAXPGPATH];
+ 
+ 		snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
+ 		snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+ 		if (rename(oldfn, newfn) != 0)
+ 		{
+ 			fprintf(stderr, _("%s: could not rename file %s: %s\n"),
+ 					progname, walname, strerror(errno));
+ 			return false;
+ 		}
+ 	}
+ 	else
+ 		fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
+ 				progname, walname);
+ 
+ 	return true;
+ }
+ 
+ 
  /*
   * Local version of GetCurrentTimestamp(), since we are not linked with
   * backend code.
***************
*** 178,187 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
! 			{
! 				fsync(walfile);
! 				close(walfile);
! 			}
  			return true;
  		}
  
--- 277,284 ----
  		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
! 				/* Potential error message is written by close_walfile */
! 				return close_walfile(walfile, basedir, current_walfile_name);
  			return true;
  		}
  
***************
*** 360,367 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				fsync(walfile);
! 				close(walfile);
  				walfile = -1;
  				xlogoff = 0;
  
--- 457,466 ----
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				if (!close_walfile(walfile, basedir, current_walfile_name))
! 					/* Error message written in close_walfile() */
! 					return false;
! 
  				walfile = -1;
  				xlogoff = 0;
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to