Having gone through the patch now in more detail, I think it's in pretty
good shape. I'm happy with the overall design, except that I haven't
been able to make up my mind if walreceiver should indeed be a
stand-alone program as discussed, or a postmaster child process as in
the patch you submitted. Putting that question aside for a moment,
here's some minor things, in no particular order:

- The async API in PQgetXLogData is quite different from the other
commands. It's close to the API from PQgetCopyData(), but doesn't return
a malloc'd buffer like PQgetCopyData does. I presume that's to optimize
away the extra memcpy step? I don't think that's really necessary, I
don't recall any complaints about that in PQgetCopyData(), and if it
does become an issue, it could be optimized away by mallocing the buffer
first and reading directly to that.

- Can we avoid sprinkling XLogStreamingAllowed() calls to places where
we check if WAL-logging is required (nbtsort.c, copy.c etc.). I think we
need a new macro to encapsulate (XLogArchivingActive() ||
XLogStreamingAllowed()).

- Is O_DIRECT ever a good idea in walreceiver? If it's really direct and
doesn't get cached, the startup process will need to read from disk.

- Can we replace read/write_conninfo with just a long-enough field in
shared mem? Would be simpler. (this is moot if we go with the
stand-alone walreceiver program and pass it as a command-line argument)

- walreceiver shouldn't die on connection error, just to be restarted by
startup process. Can we add error handling a la bgwriter and have a
retry loop within walreceiver? (again, if we go with a stand-alone
walreceiver program, it's probably better to have startup process
responsible to restart walreceiver, as it is now)

- pq_wait in backend waits until you can read or write at least 1 byte.
There is no guarantee that you can send or read the whole message
without blocking. We'd have to put the socket in non-blocking mode for
that. I'm not sure what the implications of this are.

- we should include system_identifier somewhere in the replication
startup handshake. Otherwise you can connect to server from a different
system and have logs shipped, if they happen to be roughly at the same
point in WAL. Replay will almost certainly fail, but we should error
earlier.

- I know I said we should have just asynchronous replication at first,
but looking ahead, how would you do synchronous? What kind of signaling
is needed between walreceiver and startup process for that?

- 'replication' shouldn't be a real database.


I found the paging logic in walsender confusing, and didn't like the
idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely
walreceiver knows how to split the WAL into files without such a flag. I
reworked that logic, I think it's easier to understand now. I kept the
support for the flag in libpq and the protocol for now, but it should be
removed too, or repurposed to indicate that pg_switch_xlog() was done in
the master. I've pushed that to 'replication-orig' branch in my git
repository, attached is the same as a diff against your SR_0914.patch.

I need a break from this patch, so I'll take a closer look at Simon's
hot standby now. Meanwhile, can you work on the above items and submit a
new version, please?

-- 
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 2,10 ****
  # PostgreSQL recovery config file
  # -------------------------------
  #
! # Edit this file to provide the parameters that PostgreSQL
! # needs to perform an archive recovery of a database, or
! # a log-streaming replication.
  #
  # If "recovery.conf" is present in the PostgreSQL data directory, it is
  # read on postmaster startup.  After successful recovery, it is renamed
--- 2,10 ----
  # PostgreSQL recovery config file
  # -------------------------------
  #
! # Edit this file to provide the parameters that PostgreSQL needs to
! # perform an archive recovery of a database, or to act as a log-streaming
! # replication standby.
  #
  # If "recovery.conf" is present in the PostgreSQL data directory, it is
  # read on postmaster startup.  After successful recovery, it is renamed
***************
*** 83,89 ****
  #---------------------------------------------------------------------------
  #
  # When standby_mode is enabled, the PostgreSQL server will work as
! # the standby. It tries to connect to the primary according to the
  # connection settings primary_conninfo, and receives XLOG records
  # continuously.
  #
--- 83,89 ----
  #---------------------------------------------------------------------------
  #
  # When standby_mode is enabled, the PostgreSQL server will work as
! # a standby. It tries to connect to the primary according to the
  # connection settings primary_conninfo, and receives XLOG records
  # continuously.
  #
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2645,2653 **** XLogFileClose(void)
  	 * WAL segment files will not be re-read in normal operation, so we advise
  	 * the OS to release any cached pages.	But do not do so if WAL archiving
  	 * or streaming is active, because archiver and walsender process could use
! 	 * the cache to read the WAL segment, respectively.  Also, don't bother
! 	 * with it if we are using O_DIRECT, since the kernel is presumably not
! 	 * caching in that case.
  	 */
  #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
  	if (!XLogArchivingActive() && !WalSndInProgress() &&
--- 2645,2653 ----
  	 * WAL segment files will not be re-read in normal operation, so we advise
  	 * the OS to release any cached pages.	But do not do so if WAL archiving
  	 * or streaming is active, because archiver and walsender process could use
! 	 * the cache to read the WAL segment.  Also, don't bother with it if we
! 	 * are using O_DIRECT, since the kernel is presumably not caching in that
! 	 * case.
  	 */
  #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
  	if (!XLogArchivingActive() && !WalSndInProgress() &&
***************
*** 3481,3487 **** FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
  						startlsn.xlogid, startlsn.xrecoff)));
  	}
  
! 	return ReadRecord(RecPtr, emode);	
  }
  
  /*
--- 3481,3487 ----
  						startlsn.xlogid, startlsn.xrecoff)));
  	}
  
! 	return ReadRecord(RecPtr, emode);
  }
  
  /*
***************
*** 5284,5290 **** exitStreamingRecovery(void)
  	 */
  	ShutdownWalRcv();
  
! 	/* We are no longer in streaming recovery state */		
  	InStreamingRecovery = false;
  
  	ereport(LOG,
--- 5284,5290 ----
  	 */
  	ShutdownWalRcv();
  
! 	/* We are no longer in streaming recovery state */
  	InStreamingRecovery = false;
  
  	ereport(LOG,
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 289,295 **** typedef enum
  	PM_WAIT_BACKENDS,			/* waiting for live backends to exit */
  	PM_SHUTDOWN,				/* waiting for bgwriter to do shutdown ckpt */
  	PM_SHUTDOWN_2,				/* waiting for archiver to finish */
- 	PM_SHUTDOWN_3,				/* waiting for walsenders to finish */
  	PM_WAIT_DEAD_END,			/* waiting for dead_end children to exit */
  	PM_NO_CHILDREN				/* all important children have exited */
  } PMState;
--- 289,294 ----
***************
*** 1640,1646 **** retry1:
  	if (proto == XLOG_STREAMING_CODE && !am_walsender)
  	{
  		am_walsender = true;
! 		/* No packets other than regular one should not follow */
  		return ProcessStartupPacket(port, SSLdone);
  	}
  
--- 1639,1645 ----
  	if (proto == XLOG_STREAMING_CODE && !am_walsender)
  	{
  		am_walsender = true;
! 		/* No packets other than regular one should follow */
  		return ProcessStartupPacket(port, SSLdone);
  	}
  
***************
*** 2404,2420 **** reaper(SIGNAL_ARGS)
  				 */
  				Assert(Shutdown > NoShutdown);
  
! 				if (PgArchPID != 0)
  				{
  					/* Waken archiver for the last time */
! 					signal_child(PgArchPID, SIGUSR2);
! 					pmState = PM_SHUTDOWN_2;
! 				}
! 				else if (WalSndInProgress()) 
! 				{
  					/* Waken walsenders for the last time */
  					SignalWalSenders(SIGUSR2);
! 					pmState = PM_SHUTDOWN_3;
  				}
  				else
  					pmState = PM_WAIT_DEAD_END;
--- 2403,2418 ----
  				 */
  				Assert(Shutdown > NoShutdown);
  
! 				if (PgArchPID != 0 || WalSndInProgress())
  				{
  					/* Waken archiver for the last time */
! 					if (PgArchPID != 0)
! 						signal_child(PgArchPID, SIGUSR2);
! 
  					/* Waken walsenders for the last time */
  					SignalWalSenders(SIGUSR2);
! 
! 					pmState = PM_SHUTDOWN_2;
  				}
  				else
  					pmState = PM_WAIT_DEAD_END;
***************
*** 2499,2510 **** reaper(SIGNAL_ARGS)
  				 ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
  				  WalRcvInProgress())))
  				PgArchPID = pgarch_start();
! 			else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress())
! 			{
! 				SignalWalSenders(SIGUSR2);
! 				pmState = PM_SHUTDOWN_3;
! 			}
! 			else
  				pmState = PM_WAIT_DEAD_END;
  			continue;
  		}
--- 2497,2503 ----
  				 ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
  				  WalRcvInProgress())))
  				PgArchPID = pgarch_start();
! 			else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress())
  				pmState = PM_WAIT_DEAD_END;
  			continue;
  		}
***************
*** 2611,2618 **** CleanupBackend(int pid,
  				 * advance to the next shutdown step.
  				 */
  				if (bp->child_type == BACKEND_TYPE_WALSND &&
! 					pmState == PM_SHUTDOWN_3 &&
! 					!WalSndInProgress())
  					pmState = PM_WAIT_DEAD_END;
  			}
  			DLRemove(curr);
--- 2604,2611 ----
  				 * advance to the next shutdown step.
  				 */
  				if (bp->child_type == BACKEND_TYPE_WALSND &&
! 					pmState == PM_SHUTDOWN_2 &&
! 					!WalSndInProgress() && PgArchPID == 0)
  					pmState = PM_WAIT_DEAD_END;
  			}
  			DLRemove(curr);
*** a/src/backend/postmaster/walreceiver.c
--- b/src/backend/postmaster/walreceiver.c
***************
*** 100,108 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
  static void WalRcvLoop(void);
  static void	InitWalRcv(void);
  static void	WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
! static void XLogWalRcvFlush(XLogRecPtr recptr);
! static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
  static char *read_conninfo_file(void);
  
  /* Main entry point for walreceiver process */
--- 100,107 ----
  static void WalRcvLoop(void);
  static void	InitWalRcv(void);
  static void	WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr);
! static void XLogWalRcvFlush(void);
  static char *read_conninfo_file(void);
  
  /* Main entry point for walreceiver process */
***************
*** 228,235 **** WalRcvLoop(void)
  	/* Loop until end-of-streaming or error */
  	for (;;)
  	{
- 		bool	fsynced = false;
- 
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
  		 * necessity for manual cleanup of all postmaster children.
--- 227,232 ----
***************
*** 298,304 **** WalRcvLoop(void)
  		 * can recover all transactions from the primary).
  		 */
  
! 		XLogWalRcvWrite(buf, len, recptr, &fsynced);
  
  		/*
  		 * The logs in the XLogData message were written successfully,
--- 295,301 ----
  		 * can recover all transactions from the primary).
  		 */
  
! 		XLogWalRcvWrite(buf, len, recptr);
  
  		/*
  		 * The logs in the XLogData message were written successfully,
***************
*** 307,357 **** WalRcvLoop(void)
  		PQmarkConsumed(streamConn);
  
  		/*
! 		 * If fsync is not requested or was already done, we send a "success"
! 		 * to the primary before issuing fsync for end-of-segment.
  		 */
! 		if (fsynced || !fsync_requested)
! 		{
! 			if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
! 								(int) fsynced) == -1)
! 				ereport(FATAL,
! 						(errmsg("could not send a message to the primary: %s",
! 								PQerrorMessage(streamConn))));
! 		}
! 
! 		/*
! 		 * If we just wrote the whole last page of a logfile segment but
! 		 * had not fsynced it yet, fsync the segment immediately.  This
! 		 * avoids having to go back and re-open prior segments when an
! 		 * fsync request comes along later.
! 		 *
! 		 * Of course, if asked to fsync but not, do so.
! 		 */
! 		if (!fsynced && (fsync_requested || finishing_seg))
! 		{
! 			XLogWalRcvFlush(recptr);
! 
! 			if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
! 								1) == -1)
! 				ereport(FATAL,
! 						(errmsg("could not send a message to the primary: %s",
! 								PQerrorMessage(streamConn))));
! 
! 			/*
! 			 * If the segment is ready to copy to archival storage,
! 			 * notify the archiver so.
! 			 */
! 			if (finishing_seg && XLogArchivingActive())
! 				XLogArchiveNotifySeg(recvId, recvSeg);
! 
! 			/*
! 			 * XXX: Should we signal bgwriter to start a restartpoint
! 			 * if we've consumed too much xlog since the last one, like
! 			 * in normal processing? But this is not worth doing unless
! 			 * a restartpoint can be created independently from a
! 			 * checkpoint record.
! 			 */
! 		}
  	}
  
  	if (len == -1)	/* end-of-streaming */
--- 304,314 ----
  		PQmarkConsumed(streamConn);
  
  		/*
! 		 * If the primary requested us to fsync, do so now and send
! 		 * and acknowledgement.
  		 */
! 		if (fsync_requested)
! 			XLogWalRcvFlush();
  	}
  
  	if (len == -1)	/* end-of-streaming */
***************
*** 511,589 **** WalRcvInProgress(void)
   * fsynced is set to true if the log was fsyned by O_DIRECT.
   */
  static void
! XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
  {
  	int		startoff;
! 	int		endoff;
  
! 	START_CRIT_SECTION();
  
! 	if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
  	{
! 		bool	use_existent;
  
! 		/*
! 		 * XLOG segment files will be re-read in recovery operation soon,
! 		 * so we don't need to advise the OS to release any cache page.
! 		 */
! 		if (recvFile >= 0 && close(recvFile))
  			ereport(PANIC,
  					(errcode_for_file_access(),
! 					 errmsg("could not close log file %u, segment %u: %m",
! 							recvId, recvSeg)));
! 		recvFile = -1;
! 
! 		/* Create/use new log file */
! 		XLByteToPrevSeg(recptr, recvId, recvSeg);
! 		use_existent = true;
! 		recvFile = XLogFileInit(recvId, recvSeg,
! 								  &use_existent, true);
! 		recvOff = 0;
! 	}
  
! 	/* Make sure we have the current logfile open */
! 	if (recvFile < 0)
! 	{
! 		XLByteToPrevSeg(recptr, recvId, recvSeg);
! 		recvFile = XLogFileOpen(recvId, recvSeg);
! 		recvOff = 0;
! 	}
  
! 	/* Calculate the start/end file offset of the received logs */
! 	endoff = recptr.xrecoff % XLogSegSize;
! 	startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;
  
  	/*
! 	 * Re-zero the page so that bytes beyond what we've written will look
! 	 * like zeroes and not valid XLOG records. Only end page which we are
! 	 * writing need to be zeroed. Of course, we can skip zeroing the pages
! 	 * full of the XLOG records. Save the end position of the already zeroed
! 	 * area at the variable ZeroedRecPtr, and avoid zeroing the same page
! 	 * two or more times.
  	 *
  	 * This must precede the writing of the actual logs. Otherwise, a crash
! 	 * before re-zeroing would cause a corrupted page.
  	 */
! 	if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
  	{
  		int		zlen;
  
! 		zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
! 		WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
  		ZeroedRecPtr = recptr;
  		ZeroedRecPtr.xrecoff += zlen;
- 	}
  
! 	/* Write out the logs */
! 	WritePhysicalXLog(buf, len, startoff);
! 	LogstreamResult.Send	= recptr;
! 	LogstreamResult.Write	= recptr;
! 
! 	if (sync_method == SYNC_METHOD_OPEN ||
! 		sync_method == SYNC_METHOD_OPEN_DSYNC)
! 	{
! 		LogstreamResult.Flush = recptr;
! 		*fsynced = true;		/* logs were already fsynced */
  	}
  
  	/* Update shared-memory status */
--- 468,623 ----
   * fsynced is set to true if the log was fsyned by O_DIRECT.
   */
  static void
! XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
  {
  	int		startoff;
! 	int		byteswritten;
  
! 	START_CRIT_SECTION(); /* XXX: Why? */
  
! 	while (nbytes > 0)
  	{
! 		int		segbytes;
! 		uint32	tmp;
  
! 		if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
! 		{
! 			bool	use_existent;
! 
! 			/*
! 			 * XLOG segment files will be re-read in recovery operation soon,
! 			 * so we don't need to advise the OS to release any cache page.
! 			 */
! 			if (recvFile >= 0)
! 			{
! 				/*
! 				 * fsync() before we switch to next file. We would otherwise
! 				 * have to reopen this file to fsync it later
! 				 */
! 				XLogWalRcvFlush();
! 				if (close(recvFile) != 0)
! 					ereport(PANIC,
! 							(errcode_for_file_access(),
! 							 errmsg("could not close log file %u, segment %u: %m",
! 									recvId, recvSeg)));
! 			}
! 			recvFile = -1;
! 
! 			/* Create/use new log file */
! 			XLByteToSeg(recptr, recvId, recvSeg);
! 			use_existent = true;
! 			recvFile = XLogFileInit(recvId, recvSeg,
! 									&use_existent, true);
! 			recvOff = 0;
! 		}
! 
! 		/* Calculate the start offset of the received logs */
! 		startoff = recptr.xrecoff % XLogSegSize;
! 
! 		if (startoff + nbytes > XLOG_SEG_SIZE)
! 			segbytes = XLOG_SEG_SIZE - startoff;
! 		else
! 			segbytes = nbytes;
! 
! 		/* Need to seek in the file? */
! 		if (recvOff != startoff)
! 		{
! 			if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
! 				ereport(PANIC,
! 						(errcode_for_file_access(),
! 						 errmsg("could not seek in log file %u, "
! 								"segment %u to offset %u: %m",
! 								recvId, recvSeg, startoff)));
! 			recvOff = startoff;
! 		}
! 
! 		/* OK to write the logs */
! 		errno = 0;
! 		
! 		byteswritten = write(recvFile, buf, segbytes);
! 		if (byteswritten <= 0)
! 		{
! 			/* if write didn't set errno, assume no disk space */
! 			if (errno == 0)
! 				errno = ENOSPC;
  			ereport(PANIC,
  					(errcode_for_file_access(),
! 					 errmsg("could not write to log file %u, segment %u "
! 							"at offset %u, length %lu: %m",
! 							recvId, recvSeg,
! 							recvOff, (unsigned long) segbytes)));
! 		}
  
! 		/* Update state for read */
! 		tmp = recptr.xrecoff + byteswritten;
! 		if (tmp < recptr.xrecoff)
! 			recptr.xlogid++; /* overflow */
! 		recptr.xrecoff = tmp;
  
! 		recvOff += byteswritten;
! 		nbytes -= byteswritten;
! 		buf += byteswritten;
! 
! 		LogstreamResult.Send	= recptr;
! 		LogstreamResult.Write	= recptr;
! 
! 		if (sync_method == SYNC_METHOD_OPEN ||
! 			sync_method == SYNC_METHOD_OPEN_DSYNC)
! 		{
! 			LogstreamResult.Flush = recptr;
! 		}
! 
! 		/*
! 		 * If the segment is ready to copy to archival storage,
! 		 * notify the archiver so.
! 		 */
! 		if ((recptr.xrecoff % XLOG_SEG_SIZE == 0) && XLogArchivingActive())
! 			XLogArchiveNotifySeg(recvId, recvSeg);
! 
! 		/*
! 		 * XXX: Should we signal bgwriter to start a restartpoint
! 		 * if we've consumed too much xlog since the last one, like
! 		 * in normal processing? But this is not worth doing unless
! 		 * a restartpoint can be created independently from a
! 		 * checkpoint record.
! 		 */
! 	}
  
  	/*
! 	 * Zero the rest of the last page we wrote to, so that bytes beyond what
! 	 * we've written will look like zeroes and not valid XLOG records. Save
! 	 * the end position of the already zeroed area at the variable
! 	 * ZeroedRecPtr, and avoid zeroing the same page two or more times.
  	 *
  	 * This must precede the writing of the actual logs. Otherwise, a crash
! 	 * before re-zeroing would cause a corrupted page. XXX: that's not really
! 	 * an issue, a hard crash could leave the page half-flushed anyway. And we
! 	 * have CRC to protect from that anyway, this zeroing business isn't
! 	 * absolutely necessary anyway.
  	 */
! 	if (XLByteLT(ZeroedRecPtr, recptr) && recptr.xrecoff % XLOG_BLCKSZ != 0)
  	{
  		int		zlen;
  
! 		zlen = XLOG_BLCKSZ - recptr.xrecoff % XLOG_BLCKSZ;
! 
! 		byteswritten = write(recvFile, ZeroedBuffer, zlen);
! 		if (byteswritten != zlen)
! 		{
! 			/* if write didn't set errno, assume no disk space */
! 			if (errno == 0)
! 				errno = ENOSPC;
! 			ereport(PANIC,
! 					(errcode_for_file_access(),
! 					 errmsg("could not write to log file %u, segment %u "
! 							"at offset %u, length %lu: %m",
! 							recvId, recvSeg,
! 							recvOff, (unsigned long) nbytes)));
! 		}
  		ZeroedRecPtr = recptr;
  		ZeroedRecPtr.xrecoff += zlen;
  
! 		recvOff += byteswritten;
  	}
  
  	/* Update shared-memory status */
***************
*** 594,600 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
  		SpinLockAcquire(&walrcv->mutex);
  		XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
  		XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
! 		if (*fsynced)
  			XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
  		SpinLockRelease(&walrcv->mutex);
  	}
--- 628,635 ----
  		SpinLockAcquire(&walrcv->mutex);
  		XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
  		XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
! 		if (sync_method == SYNC_METHOD_OPEN ||
! 			sync_method == SYNC_METHOD_OPEN_DSYNC)
  			XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
  		SpinLockRelease(&walrcv->mutex);
  	}
***************
*** 607,666 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
  
  /* Flush the log to disk */
  static void
! XLogWalRcvFlush(XLogRecPtr recptr)
  {
! 	START_CRIT_SECTION();
! 
! 	issue_xlog_fsync(recvFile, recvId, recvSeg);
! 
! 	LogstreamResult.Flush = recptr;
! 
! 	/* Update shared-memory status */
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile WalRcvData *walrcv = WalRcv;
  
  		SpinLockAcquire(&walrcv->mutex);
  		XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
  		SpinLockRelease(&walrcv->mutex);
- 	}
- 
- 	END_CRIT_SECTION();
- }
  
! /* Physical write to the given logs */
! static void
! WritePhysicalXLog(char *from, Size nbytes, int startoff)
! {
! 	/* Need to seek in the file? */
! 	if (recvOff != startoff)
! 	{
! 		if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
! 			ereport(PANIC,
! 					(errcode_for_file_access(),
! 					 errmsg("could not seek in log file %u, "
! 							"segment %u to offset %u: %m",
! 							recvId, recvSeg, startoff)));
! 		recvOff = startoff;
! 	}
  
! 	/* OK to write the logs */
! 	errno = 0;
! 	if (write(recvFile, from, nbytes) != nbytes)
! 	{
! 		/* if write didn't set errno, assume no disk space */
! 		if (errno == 0)
! 			errno = ENOSPC;
! 		ereport(PANIC,
! 				(errcode_for_file_access(),
! 				 errmsg("could not write to log file %u, segment %u "
! 						"at offset %u, length %lu: %m",
! 						recvId, recvSeg,
! 						recvOff, (unsigned long) nbytes)));
  	}
- 
- 	/* Update state for write */
- 	recvOff += nbytes;
  }
  
  /*
--- 642,674 ----
  
  /* Flush the log to disk */
  static void
! XLogWalRcvFlush(void)
  {
! 	if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
  	{
  		/* use volatile pointer to prevent code rearrangement */
  		volatile WalRcvData *walrcv = WalRcv;
  
+ 		START_CRIT_SECTION();
+ 
+ 		issue_xlog_fsync(recvFile, recvId, recvSeg);
+ 
+ 		LogstreamResult.Flush = LogstreamResult.Write;
+ 
+ 		/* Update shared-memory status */
  		SpinLockAcquire(&walrcv->mutex);
  		XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
  		SpinLockRelease(&walrcv->mutex);
  
! 		END_CRIT_SECTION();
  
! 		/* Let the primary know */
! 		if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid,
! 							LogstreamResult.Flush.xrecoff, 1) == -1)
! 			ereport(FATAL,
! 					(errmsg("could not send a message to the primary: %s",
! 							PQerrorMessage(streamConn))));
  	}
  }
  
  /*
*** a/src/backend/postmaster/walsender.c
--- b/src/backend/postmaster/walsender.c
***************
*** 113,122 **** static void WalSndQuickDieHandler(SIGNAL_ARGS);
  static int	WalSndLoop(void);
  static void	InitWalSnd(void);
  static void	WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr);
  static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
  static bool ProcessStreamMsgs(PendingMessage inMsg);
  
  /* Main entry point for walsender process */
  int
  WalSenderMain(void)
--- 113,127 ----
  static int	WalSndLoop(void);
  static void	InitWalSnd(void);
  static void	WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
  static bool ProcessStreamMsgs(PendingMessage inMsg);
  
+ /*
+  * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+  */
+ #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+ 
  /* Main entry point for walsender process */
  int
  WalSenderMain(void)
***************
*** 382,400 **** WalSndKill(int code, Datum arg)
  }
  
  /*
!  * Read the log into buffer.
!  *
!  * startoff is the file offset where we start reading the log from; nbytes is
!  * the number of bytes which needs to be read; recptr is the last byte + 1 to
!  * read.
   */
  void
! XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  {
  	char path[MAXPGPATH];
! 
! 	/* Don't cross a segment boundary */
! 	Assert(startoff + nbytes <= XLogSegSize);
  
  #ifdef REPLICATION_DEBUG
  	if (REPLICATION_DEBUG_ENABLED)
--- 387,399 ----
  }
  
  /*
!  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
   */
  void
! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  {
  	char path[MAXPGPATH];
! 	uint32 startoff;
  
  #ifdef REPLICATION_DEBUG
  	if (REPLICATION_DEBUG_ENABLED)
***************
*** 404,464 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  			 LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
  #endif
  
! 	if (!XLByteInPrevSeg(recptr, sendId, sendSeg))
  	{
! 		/* Switch to another logfile segment */
! 		if (sendFile >= 0)
! 			close(sendFile);
  
! 		XLByteToPrevSeg(recptr, sendId, sendSeg);
! 		XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
  
! 		sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
! 		if (sendFile < 0)
! 			ereport(FATAL,
! 					(errcode_for_file_access(),
! 					 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! 							path, sendId, sendSeg)));
! 		sendOff = 0;
! 	}
  
! 	/* Make sure we have the current logfile open */
! 	if (sendFile < 0)
! 	{
! 		XLByteToPrevSeg(recptr, sendId, sendSeg);
! 		XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
  
! 		sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
! 		if (sendFile < 0)
! 			ereport(FATAL,
! 					(errcode_for_file_access(),
! 					 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! 							path, sendId, sendSeg)));
! 		sendOff = 0;
! 	}
  
! 	/* Need to seek in the file? */
! 	if (sendOff != startoff)
! 	{
! 		if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
  			ereport(FATAL,
  					(errcode_for_file_access(),
! 					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! 							sendId, sendSeg, startoff)));
! 		sendOff = startoff;
! 	}
! 
! 	if (read(sendFile, buf, nbytes) != nbytes)
! 	{
! 		ereport(FATAL,
! 				(errcode_for_file_access(),
! 				 errmsg("could not read from log file %u, segment %u, offset %u, "
! 						"length %lu: %m",
! 						sendId, sendSeg, sendOff, (unsigned long) nbytes)));
  	}
- 
- 	/* Update state for read */
- 	sendOff += nbytes;
  }
  
  /*
--- 403,469 ----
  			 LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
  #endif
  
! 	while (nbytes > 0)
  	{
! 		int segbytes;
! 		int readbytes;
! 		uint32 tmp;
  
! 		startoff = recptr.xrecoff % XLOG_SEG_SIZE;
  
! 		if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
! 		{
! 			/* Switch to another logfile segment */
! 			if (sendFile >= 0)
! 				close(sendFile);
! 
! 			XLByteToSeg(recptr, sendId, sendSeg);
! 			XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
! 
! 			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
! 			if (sendFile < 0)
! 				ereport(FATAL, /* XXX: Why FATAL? */
! 						(errcode_for_file_access(),
! 						 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! 								path, sendId, sendSeg)));
! 			sendOff = 0;
! 		}
  
! 		/* Need to seek in the file? */
! 		if (sendOff != startoff)
! 		{
! 			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
! 				ereport(FATAL,
! 						(errcode_for_file_access(),
! 						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! 								sendId, sendSeg, startoff)));
! 			sendOff = startoff;
! 		}
  
! 		/* How many bytes are within this segment? */
! 		if (nbytes > (XLOG_SEG_SIZE - startoff))
! 			segbytes = XLOG_SEG_SIZE - startoff;
! 		else
! 			segbytes = nbytes;
  
! 		readbytes = read(sendFile, buf, segbytes);
! 		if (readbytes <= 0)
  			ereport(FATAL,
  					(errcode_for_file_access(),
! 					 errmsg("could not read from log file %u, segment %u, offset %u, "
! 							"length %lu: %m",
! 							sendId, sendSeg, sendOff, (unsigned long) segbytes)));
! 
! 		/* Update state for read */
! 		tmp = recptr.xrecoff + readbytes;
! 		if (tmp < recptr.xrecoff)
! 			recptr.xlogid++; /* overflow */
! 		recptr.xrecoff = tmp;
! 
! 		sendOff += readbytes;
! 		nbytes -= readbytes;
! 		buf += readbytes;
  	}
  }
  
  /*
***************
*** 469,488 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  static bool
  XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  {
- 	bool		ispartialpage;
- 	bool		last_iteration;
- 	bool		finishing_seg;
- 	int			nmsgs;
- 	int			npages;
  	int			res;
- 	uint32		startpos;
- 	uint32		startoff;
- 	uint32		endpos;
  	XLogRecPtr	SendRqstPtr;
  
  	/*
! 	 * Invalid position means that XLOG streaming is not started yet,
! 	 * so we do nothing here.
  	 */
  	if (XLogRecPtrIsInvalid(LogstreamResult.Send))
  		return true;
--- 474,486 ----
  static bool
  XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  {
  	int			res;
  	XLogRecPtr	SendRqstPtr;
  
  	/*
! 	 * Invalid position means that we have not yet received the initial
! 	 * XLogRecPtr message from the slave that indicates where to start the
! 	 * streaming.
  	 */
  	if (XLogRecPtrIsInvalid(LogstreamResult.Send))
  		return true;
***************
*** 490,495 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
--- 488,497 ----
  	/* Attempt to send all the records which were written to the disk */
  	SendRqstPtr = GetWriteRecPtr();
  
+ 	/* Quick exit if nothing to do */
+ 	if (!XLByteLT(LogstreamResult.Send, SendRqstPtr))
+ 		return true;
+ 
  #ifdef REPLICATION_DEBUG
  	if (REPLICATION_DEBUG_ENABLED)
  		elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X",
***************
*** 520,631 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  	 * sending in the last page. We must initialize all of them to
  	 * keep the compiler quiet.
  	 */
- 	nmsgs = 0;
- 	npages = 0;
- 	startpos = 0;
- 	startoff = 0;
- 	endpos = XLOG_BLCKSZ;
  
  	while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
  	{
  		/*
! 		 * Advance LogstreamResult.Send to end of current page. If this
! 		 * is a first loop iteration (i.e., in the case where npages is 0),
! 		 * it might indicate a halfway position or cross a logid boundary,
! 		 * so alignment is needed. Otherwise, since it's guaranteed that
! 		 * LogstreamResult.Send indicates end of previous page and we have
! 		 * not crossed a logid boundary yet in this loop iteration,
! 		 * we have only to increment it by XLOG_BLCKSZ bytes.
  		 */
! 		if (npages == 0)
! 		{
! 			startpos = LogstreamResult.Send.xrecoff % XLOG_BLCKSZ;
! 			startoff = LogstreamResult.Send.xrecoff % XLogSegSize - startpos;
  
! 			LogstreamResult.Send.xrecoff += XLOG_BLCKSZ - startpos;
! 			if (LogstreamResult.Send.xrecoff > XLogFileSize)
! 			{
! 				LogstreamResult.Send.xlogid++;
! 				LogstreamResult.Send.xrecoff %= XLogFileSize;
! 			}
! 		}
! 		else
! 			LogstreamResult.Send.xrecoff += XLOG_BLCKSZ;
! 		ispartialpage = XLByteLT(SendRqstPtr, LogstreamResult.Send);
  
! 		npages++;
  
  		/*
! 		 * Read and send the set if this will be the last loop iteration,
! 		 * or if the number of pages in the set is larger than
! 		 * MaxPagesPerXLogData, or if we are at the end of the logfile
! 		 * segment.
  		 */
- 		last_iteration = !XLByteLT(LogstreamResult.Send, SendRqstPtr);
- 		if (last_iteration)
- 		{
- 			endpos = SendRqstPtr.xrecoff % XLOG_BLCKSZ;
- 			if (endpos == 0)
- 				endpos = XLOG_BLCKSZ;
- 		}
- 
- 		finishing_seg = !ispartialpage &&
- 			(startoff + npages * XLOG_BLCKSZ) >= XLogSegSize;
  
! 		/* Only asked to send a partial page */
! 		if (ispartialpage)
! 			LogstreamResult.Send = SendRqstPtr;
  
! 		if (last_iteration ||
! 			npages >= MaxPagesPerXLogData ||
! 			finishing_seg)
  		{
! 			Size	nbytes;
! 			uint8	flags = 0;
! 
! 			if (finishing_seg)
! 				flags |= XLOGSTREAM_END_SEG;
! 
! 			/*
! 			 * XXX: Should we request the standby to fsync the log if the
! 			 * current set might include a shutdown checkpoint record?
! 			 */
! 
! 			/* OK to read and send the log */
! 			pq_beginasyncmsg(outMsg, 'w');
! 			pq_sendint(outMsg->buf, flags, 1);
! 			pq_sendint(outMsg->buf, LogstreamResult.Send.xlogid, 4);
! 			pq_sendint(outMsg->buf, LogstreamResult.Send.xrecoff, 4);
! 
! 			nbytes = (npages - 1) * (Size) XLOG_BLCKSZ - startpos + endpos;
! 
! 			/*
! 			 * Read the log into the output buffer directly to prevent
! 			 * extra memcpy calls.
! 			 */
! 			XLogRead(BufferGetStringInfo(outMsg->buf, nbytes),
! 					 startoff + startpos, nbytes, LogstreamResult.Send);
  
! 			res = pq_endasyncmsg(outMsg);
! 			if (res < 0)
! 				return false;
! 			if (res == 0)
! 				break;
  
! 			/*
! 			 * Stop sending the log for another job (e.g., checking for
! 			 * interrupts) periodically.
! 			 */
! 			if (++nmsgs > MaxMsgsPerXLogSend)
! 			{
! 				pending_xlog_send = true;
! 				break;
! 			}
! 
! 			npages = 0;
! 		}
  
! 		if (ispartialpage)
  			break;
  	}
  
--- 522,588 ----
  	 * sending in the last page. We must initialize all of them to
  	 * keep the compiler quiet.
  	 */
  
  	while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
  	{
+ 		XLogRecPtr startptr;
+ 		XLogRecPtr endptr;
+ 		Size	nbytes;
+ 		uint8	flags = 0;
+ 
  		/*
! 		 * Figure out how much to send in one message. If there's less than
! 		 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
! 		 * MAX_SEND_SIZE bytes, but round to page boundary for efficiency.
  		 */
! 		startptr = LogstreamResult.Send;
! 		endptr = startptr;
! 		endptr.xrecoff += MAX_SEND_SIZE;
! 		if(endptr.xrecoff < startptr.xrecoff)
! 			endptr.xlogid++; /* xrecoff overflowed */
  
! 		/* round down to page boundary */
! 		endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
  
! 		if (XLByteLT(SendRqstPtr, endptr))
! 			endptr = SendRqstPtr;
  
  		/*
! 		 * XXX: Should we request the standby to fsync the log if the
! 		 * current set might include a shutdown checkpoint record?
! 		 *
! 		 * Heikki: Well, we don't do that with other checkpoints, I don't
! 		 * see why we should at a shutdown checkpoint. However, perhaps
! 		 * walreceiver should do an fsync whenever the connection is lost,
! 		 * whatever the reason (e.g the master has been shut down) ?
  		 */
  
! 		/* OK to read and send the log */
! 		pq_beginasyncmsg(outMsg, 'w');
! 		pq_sendint(outMsg->buf, flags, 1);
! 		pq_sendint(outMsg->buf, startptr.xlogid, 4);
! 		pq_sendint(outMsg->buf, startptr.xrecoff, 4);
  
! 		if (endptr.xlogid != startptr.xlogid)
  		{
! 			Assert(endptr.xlogid == startptr.xlogid + 1);
! 			nbytes = (0xffffffff - endptr.xrecoff) + startptr.xrecoff;
! 		}
! 		else
! 			nbytes = endptr.xrecoff - startptr.xrecoff;
  
! 		LogstreamResult.Send = endptr;
  
! 		/*
! 		 * Read the log into the output buffer directly to prevent
! 		 * extra memcpy calls.
! 		 */
! 		XLogRead(BufferGetStringInfo(outMsg->buf, nbytes), startptr, nbytes);
  
! 		res = pq_endasyncmsg(outMsg);
! 		if (res < 0)
! 			return false;
! 		if (res == 0)
  			break;
  	}
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to