On Tue, Aug 5, 2014 at 9:04 PM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Tue, Jul 29, 2014 at 7:07 PM, <furu...@pm.nttdata.co.jp> wrote: >> I have improved the patch by making following changes: >> >> 1. Since stream_stop() was redundant, stream_stop() at the time of WAL file >> closing was deleted. >> >> 2. Change the Flash judging timing for the readability of source code. >> I have changed the Flash judging timing , from the continuous message >> after receiving to >> before the feedbackmassege decision of continue statement after execution. > > Thanks for the updated version of the patch! > > While reviewing the patch, I found that HandleCopyStream() is still > long and which decreases the readability of the source code. > So I feel inclined to refactor the HandleCopyStream() more for better > readability. What about the attached refactoring patch?
Sorry, I forgot to attached the patch in previous email. So attached. Regards, -- Fujii Masao
*** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 31,42 **** static char current_walfile_name[MAXPGPATH] = ""; --- 31,53 ---- static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; + static bool still_sending = true; /* feedback still needs to be sent? */ + static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); + static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr blockpos, int64 *last_status); + static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr *blockpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + char *partial_suffix); + static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, + XLogRecPtr blockpos, char *basedir, char *partial_suffix, + XLogRecPtr *stoppos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); *************** *** 740,755 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; ! bool still_sending = true; while (1) { int r; - int xlogoff; - int bytes_left; - int bytes_written; int64 now; - int hdr_len; long sleeptime; /* --- 751,763 ---- char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; ! ! still_sending = true; while (1) { int r; int64 now; long sleeptime; /* *************** *** 818,1015 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; if (r == -2) { ! PGresult *res = PQgetResult(conn); ! ! /* ! * The server closed its end of the copy stream. If we haven't ! * closed ours already, we need to do so now, unless the server ! * threw an error, in which case we don't. ! */ ! if (still_sending) ! { ! if (!close_walfile(basedir, partial_suffix, blockpos)) ! { ! /* Error message written in close_walfile() */ ! PQclear(res); ! goto error; ! } ! if (PQresultStatus(res) == PGRES_COPY_IN) ! { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) ! { ! fprintf(stderr, ! _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! PQclear(res); ! goto error; ! } ! PQclear(res); ! res = PQgetResult(conn); ! } ! still_sending = false; ! } ! if (copybuf != NULL) ! PQfreemem(copybuf); ! copybuf = NULL; ! *stoppos = blockpos; ! return res; } /* Check the message type. */ if (copybuf[0] == 'k') { ! int pos; ! bool replyRequested; ! ! /* ! * Parse the keepalive message, enclosed in the CopyData message. ! * We just check if the server requested a reply, and ignore the ! * rest. ! */ ! pos = 1; /* skip msgtype 'k' */ ! pos += 8; /* skip walEnd */ ! pos += 8; /* skip sendTime */ ! ! if (r < pos + 1) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); goto error; - } - replyRequested = copybuf[pos]; - - /* If the server requested an immediate reply, send one. */ - if (replyRequested && still_sending) - { - now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, blockpos, now, false)) - goto error; - last_status = now; - } } else if (copybuf[0] == 'w') { ! /* ! * Once we've decided we don't want to receive any more, just ! * ignore any subsequent XLogData messages. ! */ ! if (!still_sending) ! continue; ! ! /* ! * Read the header of the XLogData message, enclosed in the ! * CopyData message. We only need the WAL location field ! * (dataStart), the rest of the header is ignored. ! */ ! hdr_len = 1; /* msgtype 'w' */ ! hdr_len += 8; /* dataStart */ ! hdr_len += 8; /* walEnd */ ! hdr_len += 8; /* sendTime */ ! if (r < hdr_len) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); goto error; - } - blockpos = fe_recvint64(©buf[1]); - - /* Extract WAL location for this block */ - xlogoff = blockpos % XLOG_SEG_SIZE; - - /* - * Verify that the initial location in the stream matches where we - * think we are. - */ - if (walfile == -1) - { - /* No file open yet */ - if (xlogoff != 0) - { - fprintf(stderr, - _("%s: received transaction log record for offset %u with no file open\n"), - progname, xlogoff); - goto error; - } - } - else - { - /* More data in existing segment */ - /* XXX: store seek value don't reseek all the time */ - if (lseek(walfile, 0, SEEK_CUR) != xlogoff) - { - fprintf(stderr, - _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); - goto error; - } - } - - bytes_left = r - hdr_len; - bytes_written = 0; - - while (bytes_left) - { - int bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach - * XLOG_SEG_SIZE. - */ - if (xlogoff + bytes_left > XLOG_SEG_SIZE) - bytes_to_write = XLOG_SEG_SIZE - xlogoff; - else - bytes_to_write = bytes_left; - - if (walfile == -1) - { - if (!open_walfile(blockpos, timeline, - basedir, partial_suffix)) - { - /* Error logged by open_walfile */ - goto error; - } - } - - if (write(walfile, - copybuf + hdr_len + bytes_written, - bytes_to_write) != bytes_to_write) - { - fprintf(stderr, - _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), - progname, bytes_to_write, current_walfile_name, - strerror(errno)); - goto error; - } - - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - blockpos += bytes_to_write; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if (blockpos % XLOG_SEG_SIZE == 0) - { - if (!close_walfile(basedir, partial_suffix, blockpos)) - /* Error message written in close_walfile() */ - goto error; - - xlogoff = 0; - - if (still_sending && stream_stop(blockpos, timeline, true)) - { - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) - { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); - goto error; - } - still_sending = false; - break; /* ignore the rest of this XLogData packet */ - } - } - } - /* No more data left to write, receive next copy packet */ } else { --- 826,851 ---- goto error; if (r == -2) { ! PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, ! basedir, partial_suffix, stoppos); ! if (res == NULL) ! goto error; ! else ! return res; } /* Check the message type. */ if (copybuf[0] == 'k') { ! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, ! &last_status)) goto error; } else if (copybuf[0] == 'w') { ! if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, ! timeline, basedir, stream_stop, partial_suffix)) goto error; } else { *************** *** 1135,1137 **** CopyStreamReceive(PGconn *conn, long timeout, char **buffer) --- 971,1195 ---- *buffer = copybuf; return rawlen; } + + /* + * Process the keepalive message. + */ + static bool + ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr blockpos, int64 *last_status) + { + int pos; + bool replyRequested; + int64 now; + + /* + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. + */ + pos = 1; /* skip msgtype 'k' */ + pos += 8; /* skip walEnd */ + pos += 8; /* skip sendTime */ + + if (len < pos + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, len); + return false; + } + replyRequested = copybuf[pos]; + + /* If the server requested an immediate reply, send one. */ + if (replyRequested && still_sending) + { + now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, blockpos, now, false)) + return false; + *last_status = now; + } + + return true; + } + + /* + * Process XLogData message. + */ + static bool + ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, + XLogRecPtr *blockpos, uint32 timeline, + char *basedir, stream_stop_callback stream_stop, + char *partial_suffix) + { + int xlogoff; + int bytes_left; + int bytes_written; + int hdr_len; + + /* + * Once we've decided we don't want to receive any more, just + * ignore any subsequent XLogData messages. + */ + if (!(still_sending)) + return true; + + /* + * Read the header of the XLogData message, enclosed in the + * CopyData message. We only need the WAL location field + * (dataStart), the rest of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (len < hdr_len) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, len); + return false; + } + *blockpos = fe_recvint64(©buf[1]); + + /* Extract WAL location for this block */ + xlogoff = *blockpos % XLOG_SEG_SIZE; + + /* + * Verify that the initial location in the stream matches where we + * think we are. + */ + if (walfile == -1) + { + /* No file open yet */ + if (xlogoff != 0) + { + fprintf(stderr, + _("%s: received transaction log record for offset %u with no file open\n"), + progname, xlogoff); + return false; + } + } + else + { + /* More data in existing segment */ + /* XXX: store seek value don't reseek all the time */ + if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + { + fprintf(stderr, + _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + return false; + } + } + + bytes_left = len - hdr_len; + bytes_written = 0; + + while (bytes_left) + { + int bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach + * XLOG_SEG_SIZE. + */ + if (xlogoff + bytes_left > XLOG_SEG_SIZE) + bytes_to_write = XLOG_SEG_SIZE - xlogoff; + else + bytes_to_write = bytes_left; + + if (walfile == -1) + { + if (!open_walfile(*blockpos, timeline, + basedir, partial_suffix)) + { + /* Error logged by open_walfile */ + return false; + } + } + + if (write(walfile, + copybuf + hdr_len + bytes_written, + bytes_to_write) != bytes_to_write) + { + fprintf(stderr, + _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), + progname, bytes_to_write, current_walfile_name, + strerror(errno)); + return false; + } + + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + *blockpos += bytes_to_write; + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if (*blockpos % XLOG_SEG_SIZE == 0) + { + if (!close_walfile(basedir, partial_suffix, *blockpos)) + /* Error message written in close_walfile() */ + return false; + + xlogoff = 0; + + if (still_sending && stream_stop(*blockpos, timeline, true)) + { + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + still_sending = false; + return true; /* ignore the rest of this XLogData packet */ + } + } + } + /* No more data left to write, receive next copy packet */ + + return true; + } + + /* + * Handle end of the copy stream. + */ + static PGresult * + HandleEndOfCopyStream(PGconn *conn, char *copybuf, + XLogRecPtr blockpos, char *basedir, char *partial_suffix, + XLogRecPtr *stoppos) + { + PGresult *res = PQgetResult(conn); + + /* + * The server closed its end of the copy stream. If we haven't + * closed ours already, we need to do so now, unless the server + * threw an error, in which case we don't. + */ + if (still_sending) + { + if (!close_walfile(basedir, partial_suffix, blockpos)) + { + /* Error message written in close_walfile() */ + PQclear(res); + return NULL; + } + if (PQresultStatus(res) == PGRES_COPY_IN) + { + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, + _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + PQclear(res); + return NULL; + } + res = PQgetResult(conn); + } + still_sending = false; + } + if (copybuf != NULL) + PQfreemem(copybuf); + *stoppos = blockpos; + return res; + }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers