On Fri, Oct 28, 2011 at 08:46, Fujii Masao <[email protected]> wrote:
> On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <[email protected]> wrote:
>> Here's a version that does this. Turns out this requires a lot less
>> code than what was previously in there, which is always nice.
>>
>> We still need to solve the other part which is how to deal with the
>> partial files on restore. But this is definitely a cleaner way from a
>> pure pg_receivexlog perspective.
>>
>> Comments/reviews?
>
> Looks good.
>
> Minor comment:
> the source code comment of FindStreamingStart() seems to need to be updated.
Here's an updated patch that both includes this update to the comment,
and also the functionality to pre-pad files to 16Mb. This also seems
to have simplified the code, which is a nice bonus.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 71,104 **** usage(void)
static bool
segment_callback(XLogRecPtr segendpos, uint32 timeline)
{
- char fn[MAXPGPATH];
- struct stat statbuf;
-
if (verbose)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname, segendpos.xlogid, segendpos.xrecoff, timeline);
/*
- * Check if there is a partial file for the name we just finished, and if
- * there is, remove it under the assumption that we have now got all the
- * data we need.
- */
- segendpos.xrecoff /= XLOG_SEG_SIZE;
- PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
- snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
- basedir, timeline,
- segendpos.xlogid,
- segendpos.xrecoff);
- if (stat(fn, &statbuf) == 0)
- {
- /* File existed, get rid of it */
- if (verbose)
- fprintf(stderr, _("%s: removing file \"%s\"\n"),
- progname, fn);
- unlink(fn);
- }
-
- /*
* Never abort from this - we handle all aborting in continue_streaming()
*/
return false;
--- 71,81 ----
***************
*** 119,127 **** continue_streaming(void)
/*
* Determine starting location for streaming, based on:
* 1. If there are existing xlog segments, start at the end of the last one
! * 2. If the last one is a partial segment, rename it and start over, since
! * we don't sync after every write.
! * 3. If no existing xlog exists, start from the beginning of the current
* WAL segment.
*/
static XLogRecPtr
--- 96,103 ----
/*
* Determine starting location for streaming, based on:
* 1. If there are existing xlog segments, start at the end of the last one
! * that is complete (size matches XLogSegSize)
! * 2. If no valid xlog exists, start from the beginning of the current
* WAL segment.
*/
static XLogRecPtr
***************
*** 133,139 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
bool b;
uint32 high_log = 0;
uint32 high_seg = 0;
- bool partial = false;
dir = opendir(basedir);
if (dir == NULL)
--- 109,114 ----
***************
*** 195,201 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
disconnect_and_exit(1);
}
! if (statbuf.st_size == 16 * 1024 * 1024)
{
/* Completed segment */
if (log > high_log ||
--- 170,176 ----
disconnect_and_exit(1);
}
! if (statbuf.st_size == XLOG_SEG_SIZE)
{
/* Completed segment */
if (log > high_log ||
***************
*** 208,244 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
}
else
{
! /*
! * This is a partial file. Rename it out of the way.
! */
! char newfn[MAXPGPATH];
!
! fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
! progname, dirent->d_name, dirent->d_name);
!
! snprintf(newfn, sizeof(newfn), "%s/%s.partial",
! basedir, dirent->d_name);
!
! if (stat(newfn, &statbuf) == 0)
! {
! /*
! * XXX: perhaps we should only error out if the existing file
! * is larger?
! */
! fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
! progname, newfn);
! disconnect_and_exit(1);
! }
! if (rename(fullpath, newfn) != 0)
! {
! fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
! progname, fullpath, newfn, strerror(errno));
! disconnect_and_exit(1);
! }
!
! /* Don't continue looking for more, we assume this is the last */
! partial = true;
! break;
}
}
--- 183,191 ----
}
else
{
! fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"),
! progname, dirent->d_name, (int) statbuf.st_size);
! continue;
}
}
***************
*** 247,263 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
if (high_log > 0 || high_seg > 0)
{
XLogRecPtr high_ptr;
!
! if (!partial)
! {
! /*
! * If the segment was partial, the pointer is already at the right
! * location since we want to re-transmit that segment. If it was
! * not, we need to move it to the next segment, since we are
! * tracking the last one that was complete.
! */
! NextLogSeg(high_log, high_seg);
! }
high_ptr.xlogid = high_log;
high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
--- 194,204 ----
if (high_log > 0 || high_seg > 0)
{
XLogRecPtr high_ptr;
! /*
! * Move the starting pointer to the start of the next segment,
! * since the highest one we've seen was completed.
! */
! NextLogSeg(high_log, high_seg);
high_ptr.xlogid = high_log;
high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 27,32 ****
--- 27,33 ----
#include "receivelog.h"
#include "streamutil.h"
+ #include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
***************
*** 41,64 **** const XLogRecPtr InvalidXLogRecPtr = {0, 0};
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
* enough room in this buffer...
*/
static int
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
{
int f;
char fn[MAXPGPATH];
XLogFileName(namebuf, timeline, startpoint.xlogid,
startpoint.xrecoff / XLOG_SEG_SIZE);
! snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
! f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
if (f == -1)
fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! progname, namebuf, strerror(errno));
return f;
}
/*
* Local version of GetCurrentTimestamp(), since we are not linked with
* backend code.
--- 42,163 ----
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
* enough room in this buffer...
+ *
+ * The file will be padded to 16Mb with zeroes.
*/
static int
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
{
int f;
char fn[MAXPGPATH];
+ struct stat statbuf;
+ char *zerobuf;
+ int bytes;
XLogFileName(namebuf, timeline, startpoint.xlogid,
startpoint.xrecoff / XLOG_SEG_SIZE);
! snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
! f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666);
if (f == -1)
fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! progname, fn, strerror(errno));
!
! /*
! * Verify that the file is either empty (just created), or a complete
! * XLogSegSize segment. Anything in between indicates a corrupt file.
! */
! if (fstat(f, &statbuf) != 0)
! {
! fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"),
! progname, fn, strerror(errno));
! close(f);
! return -1;
! }
! if (statbuf.st_size == XLogSegSize)
! return f; /* File is open and ready to use */
! if (statbuf.st_size != 0)
! {
! fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
! progname, fn, (int) statbuf.st_size, XLogSegSize);
! close(f);
! return -1;
! }
!
! /* New, empty, file. So pad it to 16Mb with zeroes */
! zerobuf = xmalloc0(XLOG_BLCKSZ);
! for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
! {
! if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
! {
! fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
! progname, fn, strerror(errno));
! close(f);
! return -1;
! }
! }
! if (lseek(f, SEEK_SET, 0) != 0)
! {
! fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"),
! progname, fn, strerror(errno));
! close(f);
! return -1;
! }
return f;
}
+ static bool
+ close_walfile(int walfile, char *basedir, char *walname)
+ {
+ off_t currpos = lseek(walfile, 0, SEEK_CUR);
+
+ if (currpos == -1)
+ {
+ fprintf(stderr, _("%s: could not get current position in file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+
+ if (fsync(walfile) != 0)
+ {
+ fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+
+ if (close(walfile) != 0)
+ {
+ fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+
+ /*
+ * Rename the .partial file only if we've completed writing the
+ * whole segment.
+ */
+ if (currpos == XLOG_SEG_SIZE)
+ {
+ char oldfn[MAXPGPATH];
+ char newfn[MAXPGPATH];
+
+ snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
+ snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+ if (rename(oldfn, newfn) != 0)
+ {
+ fprintf(stderr, _("%s: could not rename file %s: %s\n"),
+ progname, walname, strerror(errno));
+ return false;
+ }
+ }
+ else
+ fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
+ progname, walname);
+
+ return true;
+ }
+
+
/*
* Local version of GetCurrentTimestamp(), since we are not linked with
* backend code.
***************
*** 178,187 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
if (stream_continue && stream_continue())
{
if (walfile != -1)
! {
! fsync(walfile);
! close(walfile);
! }
return true;
}
--- 277,284 ----
if (stream_continue && stream_continue())
{
if (walfile != -1)
! /* Potential error message is written by close_walfile */
! return close_walfile(walfile, basedir, current_walfile_name);
return true;
}
***************
*** 360,367 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{
! fsync(walfile);
! close(walfile);
walfile = -1;
xlogoff = 0;
--- 457,466 ----
/* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(walfile, basedir, current_walfile_name))
! /* Error message written in close_walfile() */
! return false;
!
walfile = -1;
xlogoff = 0;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers