On Fri, Oct 28, 2011 at 08:46, Fujii Masao <masao.fu...@gmail.com> wrote: > On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <mag...@hagander.net> wrote: >> Here's a version that does this. Turns out this requires a lot less >> code than what was previously in there, which is always nice. >> >> We still need to solve the other part which is how to deal with the >> partial files on restore. But this is definitely a cleaner way from a >> pure pg_receivexlog perspective. >> >> Comments/reviews? > > Looks good. > > Minor comment: > the source code comment of FindStreamingStart() seems to need to be updated.
Here's an updated patch that both includes this update to the comment, and also the functionality to pre-pad files to 16Mb. This also seems to have simplified the code, which is a nice bonus. -- Magnus Hagander Me: http://www.hagander.net/ Work: http://www.redpill-linpro.com/
*** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 71,104 **** usage(void) static bool segment_callback(XLogRecPtr segendpos, uint32 timeline) { - char fn[MAXPGPATH]; - struct stat statbuf; - if (verbose) fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), progname, segendpos.xlogid, segendpos.xrecoff, timeline); /* - * Check if there is a partial file for the name we just finished, and if - * there is, remove it under the assumption that we have now got all the - * data we need. - */ - segendpos.xrecoff /= XLOG_SEG_SIZE; - PrevLogSeg(segendpos.xlogid, segendpos.xrecoff); - snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial", - basedir, timeline, - segendpos.xlogid, - segendpos.xrecoff); - if (stat(fn, &statbuf) == 0) - { - /* File existed, get rid of it */ - if (verbose) - fprintf(stderr, _("%s: removing file \"%s\"\n"), - progname, fn); - unlink(fn); - } - - /* * Never abort from this - we handle all aborting in continue_streaming() */ return false; --- 71,81 ---- *************** *** 119,127 **** continue_streaming(void) /* * Determine starting location for streaming, based on: * 1. If there are existing xlog segments, start at the end of the last one ! * 2. If the last one is a partial segment, rename it and start over, since ! * we don't sync after every write. ! * 3. If no existing xlog exists, start from the beginning of the current * WAL segment. */ static XLogRecPtr --- 96,103 ---- /* * Determine starting location for streaming, based on: * 1. If there are existing xlog segments, start at the end of the last one ! * that is complete (size matches XLogSegSize) ! * 2. If no valid xlog exists, start from the beginning of the current * WAL segment. */ static XLogRecPtr *************** *** 133,139 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) bool b; uint32 high_log = 0; uint32 high_seg = 0; - bool partial = false; dir = opendir(basedir); if (dir == NULL) --- 109,114 ---- *************** *** 195,201 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) disconnect_and_exit(1); } ! if (statbuf.st_size == 16 * 1024 * 1024) { /* Completed segment */ if (log > high_log || --- 170,176 ---- disconnect_and_exit(1); } ! if (statbuf.st_size == XLOG_SEG_SIZE) { /* Completed segment */ if (log > high_log || *************** *** 208,244 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) } else { ! /* ! * This is a partial file. Rename it out of the way. ! */ ! char newfn[MAXPGPATH]; ! ! fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"), ! progname, dirent->d_name, dirent->d_name); ! ! snprintf(newfn, sizeof(newfn), "%s/%s.partial", ! basedir, dirent->d_name); ! ! if (stat(newfn, &statbuf) == 0) ! { ! /* ! * XXX: perhaps we should only error out if the existing file ! * is larger? ! */ ! fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"), ! progname, newfn); ! disconnect_and_exit(1); ! } ! if (rename(fullpath, newfn) != 0) ! { ! fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"), ! progname, fullpath, newfn, strerror(errno)); ! disconnect_and_exit(1); ! } ! ! /* Don't continue looking for more, we assume this is the last */ ! partial = true; ! break; } } --- 183,191 ---- } else { ! fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"), ! progname, dirent->d_name, (int) statbuf.st_size); ! continue; } } *************** *** 247,263 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) if (high_log > 0 || high_seg > 0) { XLogRecPtr high_ptr; ! ! if (!partial) ! { ! /* ! * If the segment was partial, the pointer is already at the right ! * location since we want to re-transmit that segment. If it was ! * not, we need to move it to the next segment, since we are ! * tracking the last one that was complete. ! */ ! NextLogSeg(high_log, high_seg); ! } high_ptr.xlogid = high_log; high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE; --- 194,204 ---- if (high_log > 0 || high_seg > 0) { XLogRecPtr high_ptr; ! /* ! * Move the starting pointer to the start of the next segment, ! * since the highest one we've seen was completed. ! */ ! NextLogSeg(high_log, high_seg); high_ptr.xlogid = high_log; high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 27,32 **** --- 27,33 ---- #include "receivelog.h" #include "streamutil.h" + #include <sys/stat.h> #include <sys/time.h> #include <sys/types.h> #include <unistd.h> *************** *** 41,64 **** const XLogRecPtr InvalidXLogRecPtr = {0, 0}; * Open a new WAL file in the specified directory. Store the name * (not including the full directory) in namebuf. Assumes there is * enough room in this buffer... */ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) { int f; char fn[MAXPGPATH]; XLogFileName(namebuf, timeline, startpoint.xlogid, startpoint.xrecoff / XLOG_SEG_SIZE); ! snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf); ! f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666); if (f == -1) fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), ! progname, namebuf, strerror(errno)); return f; } /* * Local version of GetCurrentTimestamp(), since we are not linked with * backend code. --- 42,163 ---- * Open a new WAL file in the specified directory. Store the name * (not including the full directory) in namebuf. Assumes there is * enough room in this buffer... + * + * The file will be padded to 16Mb with zeroes. */ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) { int f; char fn[MAXPGPATH]; + struct stat statbuf; + char *zerobuf; + int bytes; XLogFileName(namebuf, timeline, startpoint.xlogid, startpoint.xrecoff / XLOG_SEG_SIZE); ! snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf); ! f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666); if (f == -1) fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), ! progname, fn, strerror(errno)); ! ! /* ! * Verify that the file is either empty (just created), or a complete ! * XLogSegSize segment. Anything in between indicates a corrupt file. ! */ ! if (fstat(f, &statbuf) != 0) ! { ! fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"), ! progname, fn, strerror(errno)); ! close(f); ! return -1; ! } ! if (statbuf.st_size == XLogSegSize) ! return f; /* File is open and ready to use */ ! if (statbuf.st_size != 0) ! { ! fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"), ! progname, fn, (int) statbuf.st_size, XLogSegSize); ! close(f); ! return -1; ! } ! ! /* New, empty, file. So pad it to 16Mb with zeroes */ ! zerobuf = xmalloc0(XLOG_BLCKSZ); ! for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ) ! { ! if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) ! { ! fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"), ! progname, fn, strerror(errno)); ! close(f); ! return -1; ! } ! } ! if (lseek(f, SEEK_SET, 0) != 0) ! { ! fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"), ! progname, fn, strerror(errno)); ! close(f); ! return -1; ! } return f; } + static bool + close_walfile(int walfile, char *basedir, char *walname) + { + off_t currpos = lseek(walfile, 0, SEEK_CUR); + + if (currpos == -1) + { + fprintf(stderr, _("%s: could not get current position in file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + if (fsync(walfile) != 0) + { + fprintf(stderr, _("%s: could not fsync file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + if (close(walfile) != 0) + { + fprintf(stderr, _("%s: could not close file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + + /* + * Rename the .partial file only if we've completed writing the + * whole segment. + */ + if (currpos == XLOG_SEG_SIZE) + { + char oldfn[MAXPGPATH]; + char newfn[MAXPGPATH]; + + snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname); + snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname); + if (rename(oldfn, newfn) != 0) + { + fprintf(stderr, _("%s: could not rename file %s: %s\n"), + progname, walname, strerror(errno)); + return false; + } + } + else + fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"), + progname, walname); + + return true; + } + + /* * Local version of GetCurrentTimestamp(), since we are not linked with * backend code. *************** *** 178,187 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi if (stream_continue && stream_continue()) { if (walfile != -1) ! { ! fsync(walfile); ! close(walfile); ! } return true; } --- 277,284 ---- if (stream_continue && stream_continue()) { if (walfile != -1) ! /* Potential error message is written by close_walfile */ ! return close_walfile(walfile, basedir, current_walfile_name); return true; } *************** *** 360,367 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi /* Did we reach the end of a WAL segment? */ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) { ! fsync(walfile); ! close(walfile); walfile = -1; xlogoff = 0; --- 457,466 ---- /* Did we reach the end of a WAL segment? */ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) { ! if (!close_walfile(walfile, basedir, current_walfile_name)) ! /* Error message written in close_walfile() */ ! return false; ! walfile = -1; xlogoff = 0;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers