On Tue, Aug 5, 2014 at 9:04 PM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Tue, Jul 29, 2014 at 7:07 PM,  <furu...@pm.nttdata.co.jp> wrote:
>> I have improved the patch  by making following changes:
>>
>> 1. Since stream_stop() was redundant, stream_stop() at the time of WAL file 
>> closing was deleted.
>>
>> 2. Change the Flash judging timing for the readability of source code.
>>    I have changed the Flash judging timing , from the continuous message 
>> after receiving to
>>    before the feedbackmassege decision of continue statement after execution.
>
> Thanks for the updated version of the patch!
>
> While reviewing the patch, I found that HandleCopyStream() is still
> long and which decreases the readability of the source code.
> So I feel inclined to refactor the HandleCopyStream() more for better
> readability. What about the attached refactoring patch?

Sorry, I forgot to attached the patch in previous email. So attached.

Regards,

-- 
Fujii Masao
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 31,42 **** static char current_walfile_name[MAXPGPATH] = "";
--- 31,53 ----
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
+ static bool still_sending = true;		/* feedback still needs to be sent? */
+ 
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
  				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ 								XLogRecPtr blockpos, int64 *last_status);
+ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+ 							   XLogRecPtr *blockpos, uint32 timeline,
+ 							   char *basedir, stream_stop_callback stream_stop,
+ 							   char *partial_suffix);
+ static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+ 									   XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+ 									   XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 740,755 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
! 	bool		still_sending = true;
  
  	while (1)
  	{
  		int			r;
- 		int			xlogoff;
- 		int			bytes_left;
- 		int			bytes_written;
  		int64		now;
- 		int			hdr_len;
  		long		sleeptime;
  
  		/*
--- 751,763 ----
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
! 
! 	still_sending = true;
  
  	while (1)
  	{
  		int			r;
  		int64		now;
  		long		sleeptime;
  
  		/*
***************
*** 818,1015 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			goto error;
  		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
! 
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
! 			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
! 				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
! 					goto error;
! 				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
! 				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 					{
! 						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
! 						goto error;
! 					}
! 					PQclear(res);
! 					res = PQgetResult(conn);
! 				}
! 				still_sending = false;
! 			}
! 			if (copybuf != NULL)
! 				PQfreemem(copybuf);
! 			copybuf = NULL;
! 			*stoppos = blockpos;
! 			return res;
  		}
  
  		/* Check the message type. */
  		if (copybuf[0] == 'k')
  		{
! 			int			pos;
! 			bool		replyRequested;
! 
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
! 
! 			if (r < pos + 1)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
  				goto error;
- 			}
- 			replyRequested = copybuf[pos];
- 
- 			/* If the server requested an immediate reply, send one. */
- 			if (replyRequested && still_sending)
- 			{
- 				now = feGetCurrentTimestamp();
- 				if (!sendFeedback(conn, blockpos, now, false))
- 					goto error;
- 				last_status = now;
- 			}
  		}
  		else if (copybuf[0] == 'w')
  		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
! 
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
  				goto error;
- 			}
- 			blockpos = fe_recvint64(&copybuf[1]);
- 
- 			/* Extract WAL location for this block */
- 			xlogoff = blockpos % XLOG_SEG_SIZE;
- 
- 			/*
- 			 * Verify that the initial location in the stream matches where we
- 			 * think we are.
- 			 */
- 			if (walfile == -1)
- 			{
- 				/* No file open yet */
- 				if (xlogoff != 0)
- 				{
- 					fprintf(stderr,
- 							_("%s: received transaction log record for offset %u with no file open\n"),
- 							progname, xlogoff);
- 					goto error;
- 				}
- 			}
- 			else
- 			{
- 				/* More data in existing segment */
- 				/* XXX: store seek value don't reseek all the time */
- 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
- 				{
- 					fprintf(stderr,
- 						  _("%s: got WAL data offset %08x, expected %08x\n"),
- 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
- 					goto error;
- 				}
- 			}
- 
- 			bytes_left = r - hdr_len;
- 			bytes_written = 0;
- 
- 			while (bytes_left)
- 			{
- 				int			bytes_to_write;
- 
- 				/*
- 				 * If crossing a WAL boundary, only write up until we reach
- 				 * XLOG_SEG_SIZE.
- 				 */
- 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
- 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
- 				else
- 					bytes_to_write = bytes_left;
- 
- 				if (walfile == -1)
- 				{
- 					if (!open_walfile(blockpos, timeline,
- 									  basedir, partial_suffix))
- 					{
- 						/* Error logged by open_walfile */
- 						goto error;
- 					}
- 				}
- 
- 				if (write(walfile,
- 						  copybuf + hdr_len + bytes_written,
- 						  bytes_to_write) != bytes_to_write)
- 				{
- 					fprintf(stderr,
- 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
- 							progname, bytes_to_write, current_walfile_name,
- 							strerror(errno));
- 					goto error;
- 				}
- 
- 				/* Write was successful, advance our position */
- 				bytes_written += bytes_to_write;
- 				bytes_left -= bytes_to_write;
- 				blockpos += bytes_to_write;
- 				xlogoff += bytes_to_write;
- 
- 				/* Did we reach the end of a WAL segment? */
- 				if (blockpos % XLOG_SEG_SIZE == 0)
- 				{
- 					if (!close_walfile(basedir, partial_suffix, blockpos))
- 						/* Error message written in close_walfile() */
- 						goto error;
- 
- 					xlogoff = 0;
- 
- 					if (still_sending && stream_stop(blockpos, timeline, true))
- 					{
- 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
- 						{
- 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- 									progname, PQerrorMessage(conn));
- 							goto error;
- 						}
- 						still_sending = false;
- 						break;	/* ignore the rest of this XLogData packet */
- 					}
- 				}
- 			}
- 			/* No more data left to write, receive next copy packet */
  		}
  		else
  		{
--- 826,851 ----
  			goto error;
  		if (r == -2)
  		{
! 			PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! 													 basedir, partial_suffix, stoppos);
! 			if (res == NULL)
! 				goto error;
! 			else
! 				return res;
  		}
  
  		/* Check the message type. */
  		if (copybuf[0] == 'k')
  		{
! 			if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
! 									 &last_status))
  				goto error;
  		}
  		else if (copybuf[0] == 'w')
  		{
! 			if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! 									timeline, basedir, stream_stop, partial_suffix))
  				goto error;
  		}
  		else
  		{
***************
*** 1135,1137 **** CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
--- 971,1195 ----
  	*buffer = copybuf;
  	return rawlen;
  }
+ 
+ /*
+  * Process the keepalive message.
+  */
+ static bool
+ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ 					XLogRecPtr blockpos, int64 *last_status)
+ {
+ 	int			pos;
+ 	bool		replyRequested;
+ 	int64		now;
+ 
+ 	/*
+ 	 * Parse the keepalive message, enclosed in the CopyData message.
+ 	 * We just check if the server requested a reply, and ignore the
+ 	 * rest.
+ 	 */
+ 	pos = 1;			/* skip msgtype 'k' */
+ 	pos += 8;			/* skip walEnd */
+ 	pos += 8;			/* skip sendTime */
+ 
+ 	if (len < pos + 1)
+ 	{
+ 		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+ 				progname, len);
+ 		return false;
+ 	}
+ 	replyRequested = copybuf[pos];
+ 
+ 	/* If the server requested an immediate reply, send one. */
+ 	if (replyRequested && still_sending)
+ 	{
+ 		now = feGetCurrentTimestamp();
+ 		if (!sendFeedback(conn, blockpos, now, false))
+ 			return false;
+ 		*last_status = now;
+ 	}
+ 
+ 	return true;
+ }
+ 
+ /*
+  * Process XLogData message.
+  */
+ static bool
+ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+ 				   XLogRecPtr *blockpos, uint32 timeline,
+ 				   char *basedir, stream_stop_callback stream_stop,
+ 				   char *partial_suffix)
+ {
+ 	int			xlogoff;
+ 	int			bytes_left;
+ 	int			bytes_written;
+ 	int			hdr_len;
+ 
+ 	/*
+ 	 * Once we've decided we don't want to receive any more, just
+ 	 * ignore any subsequent XLogData messages.
+ 	 */
+ 	if (!(still_sending))
+ 		return true;
+ 
+ 	/*
+ 	 * Read the header of the XLogData message, enclosed in the
+ 	 * CopyData message. We only need the WAL location field
+ 	 * (dataStart), the rest of the header is ignored.
+ 	 */
+ 	hdr_len = 1;		/* msgtype 'w' */
+ 	hdr_len += 8;		/* dataStart */
+ 	hdr_len += 8;		/* walEnd */
+ 	hdr_len += 8;		/* sendTime */
+ 	if (len < hdr_len)
+ 	{
+ 		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+ 				progname, len);
+ 		return false;
+ 	}
+ 	*blockpos = fe_recvint64(&copybuf[1]);
+ 
+ 	/* Extract WAL location for this block */
+ 	xlogoff = *blockpos % XLOG_SEG_SIZE;
+ 
+ 	/*
+ 	 * Verify that the initial location in the stream matches where we
+ 	 * think we are.
+ 	 */
+ 	if (walfile == -1)
+ 	{
+ 		/* No file open yet */
+ 		if (xlogoff != 0)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: received transaction log record for offset %u with no file open\n"),
+ 					progname, xlogoff);
+ 			return false;
+ 		}
+ 	}
+ 	else
+ 	{
+ 		/* More data in existing segment */
+ 		/* XXX: store seek value don't reseek all the time */
+ 		if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: got WAL data offset %08x, expected %08x\n"),
+ 					progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ 			return false;
+ 		}
+ 	}
+ 
+ 	bytes_left = len - hdr_len;
+ 	bytes_written = 0;
+ 
+ 	while (bytes_left)
+ 	{
+ 		int			bytes_to_write;
+ 
+ 		/*
+ 		 * If crossing a WAL boundary, only write up until we reach
+ 		 * XLOG_SEG_SIZE.
+ 		 */
+ 		if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ 			bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ 		else
+ 			bytes_to_write = bytes_left;
+ 
+ 		if (walfile == -1)
+ 		{
+ 			if (!open_walfile(*blockpos, timeline,
+ 							  basedir, partial_suffix))
+ 			{
+ 				/* Error logged by open_walfile */
+ 				return false;
+ 			}
+ 		}
+ 
+ 		if (write(walfile,
+ 				  copybuf + hdr_len + bytes_written,
+ 				  bytes_to_write) != bytes_to_write)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+ 					progname, bytes_to_write, current_walfile_name,
+ 					strerror(errno));
+ 			return false;
+ 		}
+ 
+ 		/* Write was successful, advance our position */
+ 		bytes_written += bytes_to_write;
+ 		bytes_left -= bytes_to_write;
+ 		*blockpos += bytes_to_write;
+ 		xlogoff += bytes_to_write;
+ 
+ 		/* Did we reach the end of a WAL segment? */
+ 		if (*blockpos % XLOG_SEG_SIZE == 0)
+ 		{
+ 			if (!close_walfile(basedir, partial_suffix, *blockpos))
+ 				/* Error message written in close_walfile() */
+ 				return false;
+ 
+ 			xlogoff = 0;
+ 
+ 			if (still_sending && stream_stop(*blockpos, timeline, true))
+ 			{
+ 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 				{
+ 					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ 							progname, PQerrorMessage(conn));
+ 					return false;
+ 				}
+ 				still_sending = false;
+ 				return true;	/* ignore the rest of this XLogData packet */
+ 			}
+ 		}
+ 	}
+ 	/* No more data left to write, receive next copy packet */
+ 
+ 	return true;
+ }
+ 
+ /*
+  * Handle end of the copy stream.
+  */
+ static PGresult *
+ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+ 					  XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+ 					  XLogRecPtr *stoppos)
+ {
+ 	PGresult   *res = PQgetResult(conn);
+ 
+ 	/*
+ 	 * The server closed its end of the copy stream.  If we haven't
+ 	 * closed ours already, we need to do so now, unless the server
+ 	 * threw an error, in which case we don't.
+ 	 */
+ 	if (still_sending)
+ 	{
+ 		if (!close_walfile(basedir, partial_suffix, blockpos))
+ 		{
+ 			/* Error message written in close_walfile() */
+ 			PQclear(res);
+ 			return NULL;
+ 		}
+ 		if (PQresultStatus(res) == PGRES_COPY_IN)
+ 		{
+ 			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 			{
+ 				fprintf(stderr,
+ 						_("%s: could not send copy-end packet: %s"),
+ 						progname, PQerrorMessage(conn));
+ 				PQclear(res);
+ 				return NULL;
+ 			}
+ 			res = PQgetResult(conn);
+ 		}
+ 		still_sending = false;
+ 	}
+ 	if (copybuf != NULL)
+ 		PQfreemem(copybuf);
+ 	*stoppos = blockpos;
+ 	return res;
+ }
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to