On Wed, Jun 25, 2014 at 3:50 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Tue, Jun 24, 2014 at 3:18 PM,  <furu...@pm.nttdata.co.jp> wrote:
>>> I found that this patch breaks --status-interval option of
>>> pg_receivexlog when -m option which the patch introduced is supplied.
>>> When -m is set, pg_receivexlog tries to send the feedback message as soon
>>> as it flushes WAL file even if status interval timeout has not been passed
>>> yet. If you want to send the feedback as soon as WAL is written or flushed,
>>> like walreceiver does, you need to extend --status-interval option, for
>>> example, so that it accepts the value "-1" which means enabling that
>>> behavior.
>>>
>>> Including this change in your original patch would make it more difficult
>>> to review. I think that you should implement this as separate patch.
>>> Thought?
>> As your comments, the current specification to ignore the --status-intarvall.
>> It is necessary to respond immediately to synchronize.
>>
>> It is necessary to think about specifications the --status-intarvall.
>> So I revised it to a patch of flushmode which performed flush by a timing 
>> same as walreceiver.
>
> I'm not sure if it's good idea to call the feature which you'd like to
> add as 'flush mode'.
> ISTM that 'flush mode' is vague and confusion for users. Instead, what
> about adding
> something like --fsync-interval which pg_recvlogical supports?
>
>> A changed part deletes the feedback message after flush, and transmitted the 
>> feedback message according to the status interval.
>> Change to flushmode from syncmode the mode name, and fixed the document.
>
> + * Receive a message available from XLOG stream, blocking for
> + * maximum of 'timeout' ms.
>
> The above comment seems incorrect because 'timeout' is boolean argument.
>
> +            FD_ZERO(&input_mask);
> +            FD_SET(PQsocket(conn), &input_mask);
> +            if (standby_message_timeout)
>
> Why did you get rid of the check of 'still_sending' flag here? Originally the
> flag was checked but not in the patch.
>
> +        r = rcv_receive(true , &copybuf, conn,
> standby_message_timeout, last_status, now);
>
> When the return value is -2 (i.e., an error happend), we should go to
> the 'error' label.
>
> ISTM that stream_stop() should be called every time a message is
> processed. But the
> patch changes pg_receivexlog so that it keeps processing the received
> data without
> calling stream_stop(). This seems incorrect.
>
> 'copybuf' needs to be free'd every time new message is received. But you seem 
> to
> have forgotten to do that when rcv_receive() with no timeout is called.

The patch looks somewhat complicated and bugs can be easily introduced
because it tries to not only add new feature but also reorganize
the main loop in HandleCopyStream at the same time. To keep the patch
simple, I'm thinking to firstly apply the attached patch which just
refactors the main loop. Then we can apply the main patch, i.e., add new
feature. Thought?

Regards,

-- 
Fujii Masao
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d76e605..1182dc7 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
 				 char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
-
-		if (copybuf != NULL)
-		{
-			PQfreemem(copybuf);
-			copybuf = NULL;
-		}
+		long		sleeptime;
 
 		/*
 		 * Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,34 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			last_status = now;
 		}
 
-		r = PQgetCopyData(conn, &copybuf, 1);
-		if (r == 0)
+		/*
+		 * Compute how long send/receive loops should sleep
+		 */
+		if (standby_message_timeout && still_sending)
 		{
-			/*
-			 * No data available. Wait for some to appear, but not longer than
-			 * the specified timeout, so that we can ping the server.
-			 */
-			fd_set		input_mask;
-			struct timeval timeout;
-			struct timeval *timeoutptr;
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-			if (standby_message_timeout && still_sending)
-			{
-				int64		targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-			else
-				timeoutptr = NULL;
+			int64		targettime;
+			long		secs;
+			int			usecs;
+
+			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+			feTimestampDifference(now,
+								  targettime,
+								  &secs,
+								  &usecs);
+			if (secs <= 0)
+				secs = 1;	/* Always sleep at least 1 sec */
+
+			sleeptime = secs * 1000 + usecs / 1000;
+		}
+		else
+			sleeptime = -1;
 
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
-			{
-				/*
-				 * Got a timeout or signal. Continue the loop and either
-				 * deliver a status packet to the server or just go back into
-				 * blocking.
-				 */
-				continue;
-			}
-			else if (r < 0)
-			{
-				fprintf(stderr, _("%s: select() failed: %s\n"),
-						progname, strerror(errno));
-				goto error;
-			}
-			/* Else there is actually data on the socket */
-			if (PQconsumeInput(conn) == 0)
-			{
-				fprintf(stderr,
-						_("%s: could not receive data from WAL stream: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
+		r = CopyStreamReceive(conn, sleeptime, &copybuf);
+		if (r == 0)
 			continue;
-		}
 		if (r == -1)
+			goto error;
+		if (r == -2)
 		{
 			PGresult   *res = PQgetResult(conn);
 
@@ -877,15 +841,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			}
 			if (copybuf != NULL)
 				PQfreemem(copybuf);
+			copybuf = NULL;
 			*stoppos = blockpos;
 			return res;
 		}
-		if (r == -2)
-		{
-			fprintf(stderr, _("%s: could not read COPY data: %s"),
-					progname, PQerrorMessage(conn));
-			goto error;
-		}
 
 		/* Check the message type. */
 		if (copybuf[0] == 'k')
@@ -1056,3 +1015,115 @@ error:
 		PQfreemem(copybuf);
 	return NULL;
 }
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+	int			ret;
+	fd_set		input_mask;
+	struct timeval timeout;
+	struct timeval *timeoutptr;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr, _("%s: socket not open"), progname);
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	if (timeout_ms < 0)
+		timeoutptr = NULL;
+	else
+	{
+		timeout.tv_sec = timeout_ms / 1000L;
+		timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+		timeoutptr = &timeout;
+	}
+
+	ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+	if (ret == 0 || (ret < 0 && errno == EINTR))
+		return 0;		/* Got a timeout or signal */
+	else if (ret < 0)
+	{
+		fprintf(stderr, _("%s: select() failed: %s\n"),
+				progname, strerror(errno));
+		return -1;
+	}
+
+	return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+	static char	   *copybuf = NULL;
+	int			rawlen;
+
+	if (copybuf != NULL)
+		PQfreemem(copybuf);
+	copybuf = NULL;
+	*buffer = NULL;
+
+	/* Try to receive a CopyData message */
+	rawlen = PQgetCopyData(conn, &copybuf, 1);
+	if (rawlen == 0)
+	{
+		/*
+		 * No data available. Wait for some to appear, but not longer than
+		 * the specified timeout, so that we can ping the server.
+		 */
+		if (timeout > 0)
+		{
+			int		ret;
+
+			ret = CopyStreamPoll(conn, timeout);
+			if (ret <= 0)
+				return ret;
+		}
+
+		/* Else there is actually data on the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			fprintf(stderr,
+					_("%s: could not receive data from WAL stream: %s"),
+					progname, PQerrorMessage(conn));
+			return -1;
+		}
+
+		/* Now that we've consumed some input, try again */
+		rawlen = PQgetCopyData(conn, &copybuf, 1);
+		if (rawlen == 0)
+			return 0;
+	}
+	if (rawlen == -1)			/* end-of-streaming or error */
+		return -2;
+	if (rawlen == -2)
+	{
+		fprintf(stderr, _("%s: could not read COPY data: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	/* Return received messages to caller */
+	*buffer = copybuf;
+	return rawlen;
+}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to