On Wed, Jun 25, 2014 at 3:50 AM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Tue, Jun 24, 2014 at 3:18 PM, <furu...@pm.nttdata.co.jp> wrote: >>> I found that this patch breaks --status-interval option of >>> pg_receivexlog when -m option which the patch introduced is supplied. >>> When -m is set, pg_receivexlog tries to send the feedback message as soon >>> as it flushes WAL file even if status interval timeout has not been passed >>> yet. If you want to send the feedback as soon as WAL is written or flushed, >>> like walreceiver does, you need to extend --status-interval option, for >>> example, so that it accepts the value "-1" which means enabling that >>> behavior. >>> >>> Including this change in your original patch would make it more difficult >>> to review. I think that you should implement this as separate patch. >>> Thought? >> As your comments, the current specification to ignore the --status-intarvall. >> It is necessary to respond immediately to synchronize. >> >> It is necessary to think about specifications the --status-intarvall. >> So I revised it to a patch of flushmode which performed flush by a timing >> same as walreceiver. > > I'm not sure if it's good idea to call the feature which you'd like to > add as 'flush mode'. > ISTM that 'flush mode' is vague and confusion for users. Instead, what > about adding > something like --fsync-interval which pg_recvlogical supports? > >> A changed part deletes the feedback message after flush, and transmitted the >> feedback message according to the status interval. >> Change to flushmode from syncmode the mode name, and fixed the document. > > + * Receive a message available from XLOG stream, blocking for > + * maximum of 'timeout' ms. > > The above comment seems incorrect because 'timeout' is boolean argument. > > + FD_ZERO(&input_mask); > + FD_SET(PQsocket(conn), &input_mask); > + if (standby_message_timeout) > > Why did you get rid of the check of 'still_sending' flag here? Originally the > flag was checked but not in the patch. > > + r = rcv_receive(true , ©buf, conn, > standby_message_timeout, last_status, now); > > When the return value is -2 (i.e., an error happend), we should go to > the 'error' label. > > ISTM that stream_stop() should be called every time a message is > processed. But the > patch changes pg_receivexlog so that it keeps processing the received > data without > calling stream_stop(). This seems incorrect. > > 'copybuf' needs to be free'd every time new message is received. But you seem > to > have forgotten to do that when rcv_receive() with no timeout is called.
The patch looks somewhat complicated and bugs can be easily introduced because it tries to not only add new feature but also reorganize the main loop in HandleCopyStream at the same time. To keep the patch simple, I'm thinking to firstly apply the attached patch which just refactors the main loop. Then we can apply the main patch, i.e., add new feature. Thought? Regards, -- Fujii Masao
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d76e605..1182dc7 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos); +static int CopyStreamPoll(PGconn *conn, long timeout_ms); +static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); @@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, int bytes_written; int64 now; int hdr_len; - - if (copybuf != NULL) - { - PQfreemem(copybuf); - copybuf = NULL; - } + long sleeptime; /* * Check if we should continue streaming, or abort at this point. @@ -784,67 +781,34 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, last_status = now; } - r = PQgetCopyData(conn, ©buf, 1); - if (r == 0) + /* + * Compute how long send/receive loops should sleep + */ + if (standby_message_timeout && still_sending) { - /* - * No data available. Wait for some to appear, but not longer than - * the specified timeout, so that we can ping the server. - */ - fd_set input_mask; - struct timeval timeout; - struct timeval *timeoutptr; - - FD_ZERO(&input_mask); - FD_SET(PQsocket(conn), &input_mask); - if (standby_message_timeout && still_sending) - { - int64 targettime; - long secs; - int usecs; - - targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - feTimestampDifference(now, - targettime, - &secs, - &usecs); - if (secs <= 0) - timeout.tv_sec = 1; /* Always sleep at least 1 sec */ - else - timeout.tv_sec = secs; - timeout.tv_usec = usecs; - timeoutptr = &timeout; - } - else - timeoutptr = NULL; + int64 targettime; + long secs; + int usecs; + + targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); + feTimestampDifference(now, + targettime, + &secs, + &usecs); + if (secs <= 0) + secs = 1; /* Always sleep at least 1 sec */ + + sleeptime = secs * 1000 + usecs / 1000; + } + else + sleeptime = -1; - r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); - if (r == 0 || (r < 0 && errno == EINTR)) - { - /* - * Got a timeout or signal. Continue the loop and either - * deliver a status packet to the server or just go back into - * blocking. - */ - continue; - } - else if (r < 0) - { - fprintf(stderr, _("%s: select() failed: %s\n"), - progname, strerror(errno)); - goto error; - } - /* Else there is actually data on the socket */ - if (PQconsumeInput(conn) == 0) - { - fprintf(stderr, - _("%s: could not receive data from WAL stream: %s"), - progname, PQerrorMessage(conn)); - goto error; - } + r = CopyStreamReceive(conn, sleeptime, ©buf); + if (r == 0) continue; - } if (r == -1) + goto error; + if (r == -2) { PGresult *res = PQgetResult(conn); @@ -877,15 +841,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } if (copybuf != NULL) PQfreemem(copybuf); + copybuf = NULL; *stoppos = blockpos; return res; } - if (r == -2) - { - fprintf(stderr, _("%s: could not read COPY data: %s"), - progname, PQerrorMessage(conn)); - goto error; - } /* Check the message type. */ if (copybuf[0] == 'k') @@ -1056,3 +1015,115 @@ error: PQfreemem(copybuf); return NULL; } + +/* + * Wait until we can read CopyData message, or timeout. + * + * Returns 1 if data has become available for reading, 0 if timed out + * or interrupted by signal, and -1 on an error. + */ +static int +CopyStreamPoll(PGconn *conn, long timeout_ms) +{ + int ret; + fd_set input_mask; + struct timeval timeout; + struct timeval *timeoutptr; + + if (PQsocket(conn) < 0) + { + fprintf(stderr, _("%s: socket not open"), progname); + return -1; + } + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + + if (timeout_ms < 0) + timeoutptr = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000L; + timeout.tv_usec = (timeout_ms % 1000L) * 1000L; + timeoutptr = &timeout; + } + + ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (ret == 0 || (ret < 0 && errno == EINTR)) + return 0; /* Got a timeout or signal */ + else if (ret < 0) + { + fprintf(stderr, _("%s: select() failed: %s\n"), + progname, strerror(errno)); + return -1; + } + + return 1; +} + +/* + * Receive CopyData message available from XLOG stream, blocking for + * maximum of 'timeout' ms. + * + * If data was received, returns the length of the data. *buffer is set to + * point to a buffer holding the received message. The buffer is only valid + * until the next CopyStreamReceive call. + * + * 0 if no data was available within timeout, or wait was interrupted + * by signal. -1 on error. -2 if the server ended the COPY. + */ +static int +CopyStreamReceive(PGconn *conn, long timeout, char **buffer) +{ + static char *copybuf = NULL; + int rawlen; + + if (copybuf != NULL) + PQfreemem(copybuf); + copybuf = NULL; + *buffer = NULL; + + /* Try to receive a CopyData message */ + rawlen = PQgetCopyData(conn, ©buf, 1); + if (rawlen == 0) + { + /* + * No data available. Wait for some to appear, but not longer than + * the specified timeout, so that we can ping the server. + */ + if (timeout > 0) + { + int ret; + + ret = CopyStreamPoll(conn, timeout); + if (ret <= 0) + return ret; + } + + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, + _("%s: could not receive data from WAL stream: %s"), + progname, PQerrorMessage(conn)); + return -1; + } + + /* Now that we've consumed some input, try again */ + rawlen = PQgetCopyData(conn, ©buf, 1); + if (rawlen == 0) + return 0; + } + if (rawlen == -1) /* end-of-streaming or error */ + return -2; + if (rawlen == -2) + { + fprintf(stderr, _("%s: could not read COPY data: %s"), + progname, PQerrorMessage(conn)); + return -1; + } + + /* Return received messages to caller */ + *buffer = copybuf; + return rawlen; +}
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers