*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,127 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-F <replaceable>interval_seconds</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term>
+       <listitem>
+        <para>
+         How often should <application>pg_receivexlog</application> issue sync
+         commands to ensure the received WAL file is safely flushed to disk. 
+         Specifying an interval of <literal>-1</literal> issuing fsyncs at 
+         every consecutive data. The value zero issuing fsyncs at WAL file close.
+         Also not specifying an interval,issuing fsyncs at WAL file close.
+         In this case, data may be lost in the event of a crash.
+         The default value is zero.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 371,377 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 371,377 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+ static int	fsync_interval = 0; /* 0 = default */
  static volatile bool time_to_abort = false;
  
  
***************
*** 62,67 **** usage(void)
--- 63,72 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -F  --fsync-interval=INTERVAL\n"
+ 			 "                         frequency of syncs to the transaction log files (in seconds)\n"
+ 			 "                         The value -1 issuing fsyncs at every consecutive data\n"
+ 			 "                         (default: file close only)\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 335,341 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", fsync_interval);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 365,371 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"fsync-interval", required_argument, NULL, 'F'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 395,401 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 442,456 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'F':
+ 				fsync_interval = atoi(optarg) * 1000;
+ 				if (fsync_interval < -1000)
+ 				{
+ 					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,46 **** static int	walfile = -1;
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
- 
  /*
   * Open a new WAL file in the specified directory.
   *
--- 30,46 ----
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
  /*
   * Open a new WAL file in the specified directory.
   *
***************
*** 187,193 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 
  	lastFlushPosition = pos;
  	return true;
  }
--- 187,193 ----
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 	output_last_fsync = feGetCurrentTimestamp();
  	lastFlushPosition = pos;
  	return true;
  }
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 419,434 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * fsync_interval controls how often we flush to the received
+  * WAL file, in milliseconds.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int fsync_interval)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 573,579 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, fsync_interval);
  		if (res == NULL)
  			goto error;
  
***************
*** 731,737 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 734,740 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int fsync_interval)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 750,757 ----
  		int64		now;
  		int			hdr_len;
  		long		sleeptime;
+ 		int64		message_target = 0;
+ 		int64		fsync_target = 0;
  
  		/*
  		 * Check if we should continue streaming, or abort at this point.
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				goto error;
  			last_status = now;
  		}
! 
! 		/*
! 		 * Compute how long send/receive loops should sleep
! 		 */
! 		if (standby_message_timeout && still_sending)
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
--- 785,813 ----
  				goto error;
  			last_status = now;
  		}
! 		
! 		/* Compute when we need to wakeup to send a keepalive message. */
! 		if (standby_message_timeout)
! 			message_target = last_status + (standby_message_timeout - 1) *
! 				((int64) 1000);
! 
! 		/* Compute when we need to wakeup to fsync the output file. */
! 		if (fsync_interval > 0 && lastFlushPosition < blockpos)
! 			fsync_target = output_last_fsync + (fsync_interval - 1) *
! 				((int64) 1000);
! 
! 		/* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! 		if (still_sending && (message_target > 0 || fsync_target > 0))
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = message_target;
! 
! 			if (targettime == 0 || (fsync_target > 0 && fsync_target < targettime))
! 				targettime = fsync_target;
! 
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		if (r == 0)
! 			continue;
! 		if (r == -1)
! 			goto error;
! 		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
  
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
  			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  					{
  						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
  						goto error;
  					}
- 					res = PQgetResult(conn);
  				}
- 				still_sending = false;
- 			}
- 			if (copybuf != NULL)
- 				PQfreemem(copybuf);
- 			copybuf = NULL;
- 			*stoppos = blockpos;
- 			return res;
- 		}
  
! 		/* Check the message type. */
! 		if (copybuf[0] == 'k')
! 		{
! 			int			pos;
! 			bool		replyRequested;
  
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
  
! 			if (r < pos + 1)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			replyRequested = copybuf[pos];
  
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
! 			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
! 		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
  
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			blockpos = fe_recvint64(&copybuf[1]);
  
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
  
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
  				}
  			}
  			else
  			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
  				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  					goto error;
  				}
  			}
! 
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
! 
! 			while (bytes_left)
  			{
! 				int			bytes_to_write;
! 
! 				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
! 				 */
! 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 				else
! 					bytes_to_write = bytes_left;
! 
! 				if (walfile == -1)
  				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
  					{
! 						/* Error logged by open_walfile */
  						goto error;
  					}
  				}
  
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
  				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
  					goto error;
  				}
! 
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
! 
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
  				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
! 						goto error;
! 
! 					xlogoff = 0;
! 
! 					if (still_sending && stream_stop(blockpos, timeline, false))
  					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
! 							goto error;
! 						}
! 						still_sending = false;
! 						break;	/* ignore the rest of this XLogData packet */
  					}
  				}
  			}
! 			/* No more data left to write, receive next copy packet */
! 		}
! 		else
! 		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
! 			goto error;
  		}
  	}
  
--- 825,1072 ----
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		while(r > 0)
  		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
! 			{
! 				int			pos;
! 				bool		replyRequested;
  
! 				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
! 				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
! 						goto error;
! 					last_status = now;
! 				}
! 			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					break;
! 
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				blockpos = fe_recvint64(&copybuf[1]);
! 
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
! 				 */
! 				if (walfile == -1)
  				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
  					{
  						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
! 						goto error;
! 					}
! 				}
! 				else
! 				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  						goto error;
  					}
  				}
  
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
  
! 				while (bytes_left)
! 				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
  
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
! 						goto error;
! 					}
  
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
  
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
! 					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
! 							goto error;
  
! 						xlogoff = 0;
  
! 						if (still_sending && stream_stop(blockpos, timeline, false))
! 						{
! 							if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 							{
! 								fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 										progname, PQerrorMessage(conn));
! 								goto error;
! 							}
! 							still_sending = false;
! 							break;	/* ignore the rest of this XLogData packet */
! 						}
! 					}
  				}
+ 				/* No more data left to write, receive next copy packet */
  			}
  			else
  			{
! 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 								progname, copybuf[0]);
! 				goto error;
! 			}
! 			if (still_sending && stream_stop(blockpos, timeline, false))
! 			{
! 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  				{
! 					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
  					goto error;
  				}
+ 				still_sending = false;
+ 				break;  /* ignore the rest of this XLogData packet */
  			}
! 			r = CopyStreamReceive(conn, 0, &copybuf);
! 		}
! 		if (r == 0)
! 		{
! 			/* --fsync-interval argument has been specified */
! 			if (fsync_interval != 0)
  			{
! 				 /* interval has been specified */
! 				if (fsync_interval > 0)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
! 						continue;
! 					output_last_fsync = now;
! 				}
! 				/* check the need for flush */
! 				if (walfile != -1 && lastFlushPosition < blockpos)
  				{
! 					if (fsync(walfile) != 0)
  					{
! 						fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 										progname, current_walfile_name, strerror(errno));
  						goto error;
  					}
+ 					lastFlushPosition = blockpos;
  				}
+ 			}
+ 			continue;
+ 		}
+ 		if (r == -1)
+ 			goto error;
+ 		if (r == -2)
+ 		{
+ 			PGresult   *res = PQgetResult(conn);
  
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
! 			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  					{
! 						fprintf(stderr,
! 							_("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
! 						PQclear(res);
! 						goto error;
  					}
+ 					res = PQgetResult(conn);
  				}
+ 				still_sending = false;
  			}
! 			if (copybuf != NULL)
! 				PQfreemem(copybuf);
! 			copybuf = NULL;
! 			*stoppos = blockpos;
! 			return res;
  		}
  	}
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int fsync_interval);
