On Fri, Feb 18, 2011 at 12:10 PM, Robert Haas <[email protected]> wrote:
> IMHO, that's so broken as to be useless.
>
> I would really like to have a solution to this problem, though.
> Relying on TCP keepalives is weak.
Agreed.
I updated the replication timeout patch which I submitted before.
http://archives.postgresql.org/message-id/AANLkTinSvcdAYryNfZqd0wepyh1Pf7YX6Q0KxhZjas6a%40mail.gmail.com
Since the patch implements also non-blocking send functions,
the timeout can work properly even when the send buffer has
been filled up.
> There are two things that I think are pretty clear. If the receiver
> has wal_receiver_status_interval=0, then we should ignore
> replication_timeout for that connection.
The patch still doesn't check that wal_receiver_status_interval
is set up properly. I'll implement that later.
Regards,
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2015,2020 **** SET ENABLE_SEQSCAN TO OFF;
--- 2015,2042 ----
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+ <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>replication_timeout</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies the maximum time, in milliseconds, to wait for the reply
+ from the standby before terminating replication. This is useful for
+ the primary server to detect the standby crash or network outage.
+ A value of zero (the default) turns this off. This parameter can
+ only be set in the <filename>postgresql.conf</> file or on the server
+ command line.
+ </para>
+ <para>
+ To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+ must be enabled on the standby, and its value must be less than the
+ value of <varname>replication_timeout</>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</sect2>
***************
*** 2125,2130 **** SET ENABLE_SEQSCAN TO OFF;
--- 2147,2157 ----
the <filename>postgresql.conf</> file or on the server command line.
The default value is 10 seconds.
</para>
+ <para>
+ When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+ <varname>wal_receiver_status_interval</> must be enabled, and its value
+ must be less than the value of <varname>replication_timeout</>.
+ </para>
</listitem>
</varlistentry>
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,63 ----
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_getbyte_if_available - get a byte if available without blocking
+ * pq_putbytes_if_writable - send bytes to connection if writable without blocking
+ * pq_flush_if_writable - flush pending output if writable without blocking
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
***************
*** 112,117 **** static char sock_path[MAXPGPATH];
--- 114,120 ----
static char PqSendBuffer[PQ_BUFFER_SIZE];
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
+ static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer[PQ_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
***************
*** 128,133 **** static bool DoingCopyOut;
--- 131,137 ----
static void pq_close(int code, Datum arg);
static int internal_putbytes(const char *s, size_t len);
static int internal_flush(void);
+ static int internal_flush_if_writable(void);
#ifdef HAVE_UNIX_SOCKETS
static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
***************
*** 1153,1158 **** internal_putbytes(const char *s, size_t len)
--- 1157,1212 ----
}
/* --------------------------------
+ * pq_putbytes_if_writable - send bytes to connection (not flushed
+ * until pq_flush), if writable
+ *
+ * Returns the number of bytes written without blocking, or EOF if trouble.
+ * --------------------------------
+ */
+ int
+ pq_putbytes_if_writable(const char *s, size_t len)
+ {
+ size_t amount;
+ size_t nwritten = 0;
+
+ /* Should not be called by old-style COPY OUT */
+ Assert(!DoingCopyOut);
+ /* No-op if reentrant call */
+ if (PqCommBusy)
+ return 0;
+ PqCommBusy = true;
+
+ while (len > 0)
+ {
+ /* If buffer is full, then flush it out */
+ if (PqSendPointer >= PQ_BUFFER_SIZE)
+ {
+ int r;
+
+ r = internal_flush_if_writable();
+ if (r == 0)
+ break;
+ if (r == EOF)
+ {
+ PqCommBusy = false;
+ return r;
+ }
+ }
+ amount = PQ_BUFFER_SIZE - PqSendPointer;
+ if (amount > len)
+ amount = len;
+ memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ PqSendPointer += amount;
+ s += amount;
+ len -= amount;
+ nwritten += amount;
+ }
+
+ PqCommBusy = false;
+ return (int) nwritten;
+ }
+
+ /* --------------------------------
* pq_flush - flush pending output
*
* returns 0 if OK, EOF if trouble
***************
*** 1224,1229 **** internal_flush(void)
--- 1278,1411 ----
return 0;
}
+ /* --------------------------------
+ * pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 1 if OK, 0 if pending output cannot be written without blocking,
+ * or EOF if trouble.
+ * --------------------------------
+ */
+ int
+ pq_flush_if_writable(void)
+ {
+ int res;
+
+ /* No-op if reentrant call */
+ if (PqCommBusy)
+ return 0;
+ PqCommBusy = true;
+ res = internal_flush_if_writable();
+ PqCommBusy = false;
+ return res;
+ }
+
+ int
+ internal_flush_if_writable(void)
+ {
+ static int last_reported_send_errno = 0;
+
+ char *bufptr = PqSendBuffer + PqSendStart;
+ char *bufend = PqSendBuffer + PqSendPointer;
+
+ while (bufptr < bufend)
+ {
+ int r;
+
+ /* Temporarily put the socket into non-blocking mode */
+ #ifdef WIN32
+ pgwin32_noblock = 1;
+ #else
+ if (!pg_set_noblock(MyProcPort->sock))
+ ereport(ERROR,
+ (errmsg("could not set socket to non-blocking mode: %m")));
+ #endif
+ MyProcPort->noblock = true;
+ PG_TRY();
+ {
+ r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+
+ if (r < 0)
+ {
+ /*
+ * Ok if no data writable without blocking or interrupted (though
+ * EINTR really shouldn't happen with a non-blocking socket).
+ * Report other errors.
+ */
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ r = 0;
+ else
+ {
+ if (errno != last_reported_send_errno)
+ {
+ /*
+ * Careful: an ereport() that tries to write to the
+ * client would cause recursion to here, leading to
+ * stack overflow and core dump! This message must
+ * go *only* to the postmaster log.
+ *
+ * If a client disconnects while we're in the midst
+ * of output, we might write quite a bit of data before
+ * we get to a safe query abort point. So, suppress
+ * duplicate log messages.
+ */
+ last_reported_send_errno = errno;
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not send data to client: %m")));
+ }
+
+ /*
+ * We drop the buffered data anyway so that processing can
+ * continue, even though we'll probably quit soon.
+ */
+ PqSendStart = PqSendPointer = 0;
+ r = EOF;
+ }
+ }
+ else if (r == 0)
+ {
+ /* EOF detected */
+ r = EOF;
+ }
+ }
+ PG_CATCH();
+ {
+ /*
+ * The rest of the backend code assumes the socket is in blocking
+ * mode, so treat failure as FATAL.
+ */
+ #ifdef WIN32
+ pgwin32_noblock = 0;
+ #else
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(FATAL,
+ (errmsg("could not set socket to blocking mode: %m")));
+ #endif
+ MyProcPort->noblock = false;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ #ifdef WIN32
+ pgwin32_noblock = 0;
+ #else
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(FATAL,
+ (errmsg("could not set socket to blocking mode: %m")));
+ #endif
+ MyProcPort->noblock = false;
+
+ if (r == 0 || r == EOF)
+ return r;
+
+ last_reported_send_errno = 0; /* reset after any successful send */
+ bufptr += r;
+ PqSendStart += r;
+ }
+
+ PqSendStart = PqSendPointer = 0;
+ return 1;
+ }
+
/* --------------------------------
* Message-level I/O routines begin here.
*** a/src/backend/port/unix_latch.c
--- b/src/backend/port/unix_latch.c
***************
*** 193,211 **** DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
}
/*
* Like WaitLatch, but will also return when there's data available in
! * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
! * was set, or 2 if the scoket became readable.
*/
int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
{
struct timeval tv, *tvp = NULL;
fd_set input_mask;
int rc;
int result = 0;
--- 193,214 ----
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
/*
* Like WaitLatch, but will also return when there's data available in
! * 'sock' for reading or writing. Returns 0 if timeout was reached,
! * 1 if the latch was set, 2 if the scoket became readable, or 3 if
! * the socket became writable.
*/
int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
! bool forWrite, long timeout)
{
struct timeval tv, *tvp = NULL;
fd_set input_mask;
+ fd_set output_mask;
int rc;
int result = 0;
***************
*** 241,254 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd;
! if (sock != PGINVALID_SOCKET)
{
FD_SET(sock, &input_mask);
if (sock > hifd)
hifd = sock;
}
! rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
if (rc < 0)
{
if (errno == EINTR)
--- 244,265 ----
FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd;
! if (sock != PGINVALID_SOCKET && forRead)
{
FD_SET(sock, &input_mask);
if (sock > hifd)
hifd = sock;
}
! FD_ZERO(&output_mask);
! if (sock != PGINVALID_SOCKET && forWrite)
! {
! FD_SET(sock, &output_mask);
! if (sock > hifd)
! hifd = sock;
! }
!
! rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
if (rc < 0)
{
if (errno == EINTR)
***************
*** 263,273 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
result = 0;
break;
}
! if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
{
result = 2;
break; /* data available in socket */
}
}
waiting = false;
--- 274,291 ----
result = 0;
break;
}
! if (sock != PGINVALID_SOCKET && forRead &&
! FD_ISSET(sock, &input_mask))
{
result = 2;
break; /* data available in socket */
}
+ if (sock != PGINVALID_SOCKET && forWrite &&
+ FD_ISSET(sock, &output_mask))
+ {
+ result = 3;
+ break; /* data writable in socket */
+ }
}
waiting = false;
*** a/src/backend/port/win32/socket.c
--- b/src/backend/port/win32/socket.c
***************
*** 14,20 ****
#include "postgres.h"
/*
! * Indicate if pgwin32_recv() should operate in non-blocking mode.
*
* Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must
--- 14,21 ----
#include "postgres.h"
/*
! * Indicate if pgwin32_recv() and pgwin32_send() should operate
! * in non-blocking mode.
*
* Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must
***************
*** 399,404 **** pgwin32_send(SOCKET s, char *buf, int len, int flags)
--- 400,415 ----
return -1;
}
+ if (pgwin32_noblock)
+ {
+ /*
+ * No data sent, and we are in "emulated non-blocking mode", so
+ * return indicating that we'd block if we were to continue.
+ */
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
*** a/src/backend/port/win32_latch.c
--- b/src/backend/port/win32_latch.c
***************
*** 85,95 **** DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
}
int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
{
DWORD rc;
HANDLE events[3];
--- 85,96 ----
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
! bool forWrite, long timeout)
{
DWORD rc;
HANDLE events[3];
***************
*** 103,112 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
events[0] = latchevent;
events[1] = pgwin32_signal_event;
numevents = 2;
! if (sock != PGINVALID_SOCKET)
{
sockevent = WSACreateEvent();
! WSAEventSelect(sock, sockevent, FD_READ);
events[numevents++] = sockevent;
}
--- 104,120 ----
events[0] = latchevent;
events[1] = pgwin32_signal_event;
numevents = 2;
! if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
+ int flags = 0;
+
+ if (forRead)
+ flags |= FD_READ;
+ if (forWrite)
+ flags |= FD_WRITE;
+
sockevent = WSACreateEvent();
! WSAEventSelect(sock, sockevent, flags);
events[numevents++] = sockevent;
}
***************
*** 139,146 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2)
{
Assert(sock != PGINVALID_SOCKET);
! result = 2;
break;
}
else if (rc != WAIT_OBJECT_0)
--- 147,165 ----
pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2)
{
+ WSANETWORKEVENTS resEvents;
+
Assert(sock != PGINVALID_SOCKET);
!
! ZeroMemory(&resEvents, sizeof(resEvents));
! if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
! ereport(FATAL,
! (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
!
! if (forRead && resEvents.lNetworkEvents & FD_READ)
! result = 2;
! if (forWrite && resEvents.lNetworkEvents & FD_WRITE)
! result = 3;
break;
}
else if (rc != WAIT_OBJECT_0)
***************
*** 148,154 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
}
/* Clean up the handle we created for the socket */
! if (sock != PGINVALID_SOCKET)
{
WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent);
--- 167,173 ----
}
/* Clean up the handle we created for the socket */
! if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 74,79 **** bool am_walsender = false; /* Am I a walsender process ? */
--- 74,91 ----
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 1000; /* max sleep time between some actions */
+ int replication_timeout = 0; /* maximum time to send one WAL data message */
+
+ /*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+ static char *WalSndOutBuffer;
+ static int WalSndOutHead; /* head of pending output */
+ static int WalSndOutTail; /* tail of pending output */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 95,100 **** static XLogRecPtr sentPtr = {0, 0};
--- 107,117 ----
*/
static StringInfoData reply_message;
+ /*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+ static TimestampTz last_reply_timestamp;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false;
***************
*** 113,119 **** static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
! static bool XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
static void ProcessStandbyMessage(void);
--- 130,136 ----
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
! static bool XLogSend(bool *caughtup, bool *pending);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
static void ProcessStandbyMessage(void);
***************
*** 467,472 **** ProcessRepliesIfAny(void)
--- 484,490 ----
{
unsigned char firstchar;
int r;
+ int received = false;
for (;;)
{
***************
*** 479,487 **** ProcessRepliesIfAny(void)
errmsg("unexpected EOF on standby connection")));
proc_exit(0);
}
! if (r == 0)
{
! /* no data available without blocking */
return;
}
--- 497,510 ----
errmsg("unexpected EOF on standby connection")));
proc_exit(0);
}
! if (r == 0) /* no data available without blocking */
{
! /*
! * Save the last reply timestamp if we've received at least
! * one reply.
! */
! if (received)
! last_reply_timestamp = GetCurrentTimestamp();
return;
}
***************
*** 493,498 **** ProcessRepliesIfAny(void)
--- 516,522 ----
*/
case 'd':
ProcessStandbyMessage();
+ received = true;
break;
/*
***************
*** 669,683 **** ProcessStandbyHSFeedbackMessage(void)
static int
WalSndLoop(void)
{
- char *output_message;
bool caughtup = false;
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
* enough for maximum-sized messages.
*/
! output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
/*
* Allocate buffer that will be used for processing reply messages. As
--- 693,708 ----
static int
WalSndLoop(void)
{
bool caughtup = false;
+ bool pending = false;
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
* enough for maximum-sized messages.
*/
! WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
! WalSndOutHead = WalSndOutTail = 0;
/*
* Allocate buffer that will be used for processing reply messages. As
***************
*** 685,690 **** WalSndLoop(void)
--- 710,718 ----
*/
initStringInfo(&reply_message);
+ /* Initialize the last reply timestamp */
+ last_reply_timestamp = GetCurrentTimestamp();
+
/* Loop forever, unless we get an error */
for (;;)
{
***************
*** 708,717 **** WalSndLoop(void)
*/
if (walsender_ready_to_stop)
{
! if (!XLogSend(output_message, &caughtup))
break;
ProcessRepliesIfAny();
! if (caughtup)
walsender_shutdown_requested = true;
}
--- 736,745 ----
*/
if (walsender_ready_to_stop)
{
! if (!XLogSend(&caughtup, &pending))
break;
ProcessRepliesIfAny();
! if (caughtup && !pending)
walsender_shutdown_requested = true;
}
***************
*** 726,735 **** WalSndLoop(void)
}
/*
! * If we had sent all accumulated WAL in last round, nap for the
! * configured time before retrying.
*/
! if (caughtup)
{
/*
* Even if we wrote all the WAL that was available when we started
--- 754,764 ----
}
/*
! * If we had sent all accumulated WAL in last round or could not
! * flush pending WAL in output buffer because the socket was not
! * writable, nap for the configured time before retrying.
*/
! if (caughtup || pending)
{
/*
* Even if we wrote all the WAL that was available when we started
***************
*** 740,764 **** WalSndLoop(void)
*/
ResetLatch(&MyWalSnd->latch);
! if (!XLogSend(output_message, &caughtup))
break;
! if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
{
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
/* Sleep */
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! WalSndDelay * 1000L);
}
}
else
{
/* Attempt to send the log once every loop */
! if (!XLogSend(output_message, &caughtup))
break;
}
--- 769,828 ----
*/
ResetLatch(&MyWalSnd->latch);
! if (!XLogSend(&caughtup, &pending))
break;
! if ((caughtup || pending) && !got_SIGHUP && !walsender_ready_to_stop &&
! !walsender_shutdown_requested)
{
+ TimestampTz finish_time;
+ long sleeptime;
+
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
+ /* Reschedule replication timeout */
+ if (replication_timeout > 0)
+ {
+ long secs;
+ int usecs;
+
+ finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ replication_timeout);
+ TimestampDifference(GetCurrentTimestamp(),
+ finish_time, &secs, &usecs);
+ sleeptime = secs * 1000 + usecs / 1000;
+ if (WalSndDelay < sleeptime)
+ sleeptime = WalSndDelay;
+ }
+ else
+ sleeptime = WalSndDelay;
+
/* Sleep */
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! true, (WalSndOutTail > 0),
! sleeptime * 1000L);
!
! /* Check for replication timeout */
! if (replication_timeout > 0 && GetCurrentTimestamp() >= finish_time)
! {
! /*
! * Since typically expiration of replication timeout means
! * communication problem, we don't send the error message
! * to the standby.
! */
! ereport(COMMERROR,
! (errmsg("terminating walsender process due to replication timeout")));
! break;
! }
}
}
else
{
/* Attempt to send the log once every loop */
! if (!XLogSend(&caughtup, &pending))
break;
}
***************
*** 986,1009 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it.
*
- * msgbuf is a work area in which the output message is constructed. It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
* Returns true if OK, false if trouble.
*/
static bool
! XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
! XLogRecPtr endptr;
Size nbytes;
WalDataMessageHeader msghdr;
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
--- 1050,1097 ----
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it.
*
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
* Returns true if OK, false if trouble.
*/
static bool
! XLogSend(bool *caughtup, bool *pending)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
! static XLogRecPtr endptr;
Size nbytes;
+ uint32 n32;
+ int res;
WalDataMessageHeader msghdr;
+ /* Attempt to flush pending WAL in output buffer */
+ if (*pending)
+ {
+ if (WalSndOutHead != WalSndOutTail)
+ {
+ res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+ WalSndOutTail - WalSndOutHead);
+ if (res == EOF)
+ return false;
+ WalSndOutHead += res;
+ if (WalSndOutHead != WalSndOutTail)
+ return true;
+ }
+
+ res = pq_flush_if_writable();
+ if (res == EOF)
+ return false;
+ if (res == 0)
+ return true;
+
+ goto updt;
+ }
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
***************
*** 1072,1084 **** XLogSend(char *msgbuf, bool *caughtup)
/*
* OK to read and send the slice.
*/
! msgbuf[0] = 'w';
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
! XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
--- 1160,1178 ----
/*
* OK to read and send the slice.
*/
! WalSndOutBuffer[0] = 'd';
! WalSndOutBuffer[5] = 'w';
! WalSndOutHead = 0;
! WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
!
! n32 = htonl((uint32) WalSndOutTail - 1);
! memcpy(WalSndOutBuffer + 1, &n32, 4);
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
! XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
***************
*** 1088,1100 **** XLogSend(char *msgbuf, bool *caughtup)
msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp();
! memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
! pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
/* Flush pending output to the client */
! if (pq_flush())
return false;
sentPtr = endptr;
--- 1182,1215 ----
msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp();
! memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
! res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
! if (res == EOF)
! return false;
!
! WalSndOutHead = res;
! if (WalSndOutHead != WalSndOutTail)
! {
! *caughtup = false;
! *pending = true;
! return true;
! }
/* Flush pending output to the client */
! res = pq_flush_if_writable();
! if (res == EOF)
return false;
+ if (res == 0)
+ {
+ *caughtup = false;
+ *pending = true;
+ return true;
+ }
+
+ updt:
+ WalSndOutHead = WalSndOutTail = 0;
+ *pending = false;
sentPtr = endptr;
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1847,1852 **** static struct config_int ConfigureNamesInt[] =
--- 1847,1862 ----
},
{
+ {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+ gettext_noop("Sets the maximum time to wait for WAL replication."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &replication_timeout,
+ 0, 0, INT_MAX, NULL, NULL
+ },
+
+ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
"flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 191,196 ****
--- 191,197 ----
#wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
+ #replication_timeout = 0 # in milliseconds, 0 is disabled
# - Standby Servers -
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 59,65 **** extern int pq_getbyte(void);
--- 59,67 ----
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+ extern int pq_putbytes_if_writable(const char *s, size_t len);
extern int pq_flush(void);
+ extern int pq_flush_if_writable(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
extern void pq_startcopyout(void);
extern void pq_endcopyout(bool errorAbort);
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 70,75 **** extern volatile sig_atomic_t walsender_ready_to_stop;
--- 70,76 ----
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
+ extern int replication_timeout;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
*** a/src/include/storage/latch.h
--- b/src/include/storage/latch.h
***************
*** 40,46 **** extern void OwnLatch(volatile Latch *latch);
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
--- 40,46 ----
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! bool forRead, bool forWrite, long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers