On Fri, Feb 18, 2011 at 12:10 PM, Robert Haas <robertmh...@gmail.com> wrote: > IMHO, that's so broken as to be useless. > > I would really like to have a solution to this problem, though. > Relying on TCP keepalives is weak.
Agreed. I updated the replication timeout patch which I submitted before. http://archives.postgresql.org/message-id/AANLkTinSvcdAYryNfZqd0wepyh1Pf7YX6Q0KxhZjas6a%40mail.gmail.com Since the patch implements also non-blocking send functions, the timeout can work properly even when the send buffer has been filled up. > There are two things that I think are pretty clear. If the receiver > has wal_receiver_status_interval=0, then we should ignore > replication_timeout for that connection. The patch still doesn't check that wal_receiver_status_interval is set up properly. I'll implement that later. Regards, Regards, -- Fujii Masao NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 2015,2020 **** SET ENABLE_SEQSCAN TO OFF; --- 2015,2042 ---- </para> </listitem> </varlistentry> + + <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout"> + <term><varname>replication_timeout</varname> (<type>integer</type>)</term> + <indexterm> + <primary><varname>replication_timeout</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + Specifies the maximum time, in milliseconds, to wait for the reply + from the standby before terminating replication. This is useful for + the primary server to detect the standby crash or network outage. + A value of zero (the default) turns this off. This parameter can + only be set in the <filename>postgresql.conf</> file or on the server + command line. + </para> + <para> + To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval"> + must be enabled on the standby, and its value must be less than the + value of <varname>replication_timeout</>. + </para> + </listitem> + </varlistentry> </variablelist> </sect2> *************** *** 2125,2130 **** SET ENABLE_SEQSCAN TO OFF; --- 2147,2157 ---- the <filename>postgresql.conf</> file or on the server command line. The default value is 10 seconds. </para> + <para> + When <xref linkend="guc-replication-timeout"> is enabled on the primary, + <varname>wal_receiver_status_interval</> must be enabled, and its value + must be less than the value of <varname>replication_timeout</>. + </para> </listitem> </varlistentry> *** a/src/backend/libpq/pqcomm.c --- b/src/backend/libpq/pqcomm.c *************** *** 56,61 **** --- 56,63 ---- * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output * pq_getbyte_if_available - get a byte if available without blocking + * pq_putbytes_if_writable - send bytes to connection if writable without blocking + * pq_flush_if_writable - flush pending output if writable without blocking * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) *************** *** 112,117 **** static char sock_path[MAXPGPATH]; --- 114,120 ---- static char PqSendBuffer[PQ_BUFFER_SIZE]; static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ + static int PqSendStart; /* Next index to send a byte in PqSendBuffer */ static char PqRecvBuffer[PQ_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ *************** *** 128,133 **** static bool DoingCopyOut; --- 131,137 ---- static void pq_close(int code, Datum arg); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); + static int internal_flush_if_writable(void); #ifdef HAVE_UNIX_SOCKETS static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName); *************** *** 1153,1158 **** internal_putbytes(const char *s, size_t len) --- 1157,1212 ---- } /* -------------------------------- + * pq_putbytes_if_writable - send bytes to connection (not flushed + * until pq_flush), if writable + * + * Returns the number of bytes written without blocking, or EOF if trouble. + * -------------------------------- + */ + int + pq_putbytes_if_writable(const char *s, size_t len) + { + size_t amount; + size_t nwritten = 0; + + /* Should not be called by old-style COPY OUT */ + Assert(!DoingCopyOut); + /* No-op if reentrant call */ + if (PqCommBusy) + return 0; + PqCommBusy = true; + + while (len > 0) + { + /* If buffer is full, then flush it out */ + if (PqSendPointer >= PQ_BUFFER_SIZE) + { + int r; + + r = internal_flush_if_writable(); + if (r == 0) + break; + if (r == EOF) + { + PqCommBusy = false; + return r; + } + } + amount = PQ_BUFFER_SIZE - PqSendPointer; + if (amount > len) + amount = len; + memcpy(PqSendBuffer + PqSendPointer, s, amount); + PqSendPointer += amount; + s += amount; + len -= amount; + nwritten += amount; + } + + PqCommBusy = false; + return (int) nwritten; + } + + /* -------------------------------- * pq_flush - flush pending output * * returns 0 if OK, EOF if trouble *************** *** 1224,1229 **** internal_flush(void) --- 1278,1411 ---- return 0; } + /* -------------------------------- + * pq_flush_if_writable - flush pending output if writable + * + * Returns 1 if OK, 0 if pending output cannot be written without blocking, + * or EOF if trouble. + * -------------------------------- + */ + int + pq_flush_if_writable(void) + { + int res; + + /* No-op if reentrant call */ + if (PqCommBusy) + return 0; + PqCommBusy = true; + res = internal_flush_if_writable(); + PqCommBusy = false; + return res; + } + + int + internal_flush_if_writable(void) + { + static int last_reported_send_errno = 0; + + char *bufptr = PqSendBuffer + PqSendStart; + char *bufend = PqSendBuffer + PqSendPointer; + + while (bufptr < bufend) + { + int r; + + /* Temporarily put the socket into non-blocking mode */ + #ifdef WIN32 + pgwin32_noblock = 1; + #else + if (!pg_set_noblock(MyProcPort->sock)) + ereport(ERROR, + (errmsg("could not set socket to non-blocking mode: %m"))); + #endif + MyProcPort->noblock = true; + PG_TRY(); + { + r = secure_write(MyProcPort, bufptr, bufend - bufptr); + + if (r < 0) + { + /* + * Ok if no data writable without blocking or interrupted (though + * EINTR really shouldn't happen with a non-blocking socket). + * Report other errors. + */ + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) + r = 0; + else + { + if (errno != last_reported_send_errno) + { + /* + * Careful: an ereport() that tries to write to the + * client would cause recursion to here, leading to + * stack overflow and core dump! This message must + * go *only* to the postmaster log. + * + * If a client disconnects while we're in the midst + * of output, we might write quite a bit of data before + * we get to a safe query abort point. So, suppress + * duplicate log messages. + */ + last_reported_send_errno = errno; + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("could not send data to client: %m"))); + } + + /* + * We drop the buffered data anyway so that processing can + * continue, even though we'll probably quit soon. + */ + PqSendStart = PqSendPointer = 0; + r = EOF; + } + } + else if (r == 0) + { + /* EOF detected */ + r = EOF; + } + } + PG_CATCH(); + { + /* + * The rest of the backend code assumes the socket is in blocking + * mode, so treat failure as FATAL. + */ + #ifdef WIN32 + pgwin32_noblock = 0; + #else + if (!pg_set_block(MyProcPort->sock)) + ereport(FATAL, + (errmsg("could not set socket to blocking mode: %m"))); + #endif + MyProcPort->noblock = false; + PG_RE_THROW(); + } + PG_END_TRY(); + #ifdef WIN32 + pgwin32_noblock = 0; + #else + if (!pg_set_block(MyProcPort->sock)) + ereport(FATAL, + (errmsg("could not set socket to blocking mode: %m"))); + #endif + MyProcPort->noblock = false; + + if (r == 0 || r == EOF) + return r; + + last_reported_send_errno = 0; /* reset after any successful send */ + bufptr += r; + PqSendStart += r; + } + + PqSendStart = PqSendPointer = 0; + return 1; + } + /* -------------------------------- * Message-level I/O routines begin here. *** a/src/backend/port/unix_latch.c --- b/src/backend/port/unix_latch.c *************** *** 193,211 **** DisownLatch(volatile Latch *latch) bool WaitLatch(volatile Latch *latch, long timeout) { ! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; } /* * Like WaitLatch, but will also return when there's data available in ! * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch ! * was set, or 2 if the scoket became readable. */ int ! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) { struct timeval tv, *tvp = NULL; fd_set input_mask; int rc; int result = 0; --- 193,214 ---- bool WaitLatch(volatile Latch *latch, long timeout) { ! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0; } /* * Like WaitLatch, but will also return when there's data available in ! * 'sock' for reading or writing. Returns 0 if timeout was reached, ! * 1 if the latch was set, 2 if the scoket became readable, or 3 if ! * the socket became writable. */ int ! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, ! bool forWrite, long timeout) { struct timeval tv, *tvp = NULL; fd_set input_mask; + fd_set output_mask; int rc; int result = 0; *************** *** 241,254 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) FD_ZERO(&input_mask); FD_SET(selfpipe_readfd, &input_mask); hifd = selfpipe_readfd; ! if (sock != PGINVALID_SOCKET) { FD_SET(sock, &input_mask); if (sock > hifd) hifd = sock; } ! rc = select(hifd + 1, &input_mask, NULL, NULL, tvp); if (rc < 0) { if (errno == EINTR) --- 244,265 ---- FD_ZERO(&input_mask); FD_SET(selfpipe_readfd, &input_mask); hifd = selfpipe_readfd; ! if (sock != PGINVALID_SOCKET && forRead) { FD_SET(sock, &input_mask); if (sock > hifd) hifd = sock; } ! FD_ZERO(&output_mask); ! if (sock != PGINVALID_SOCKET && forWrite) ! { ! FD_SET(sock, &output_mask); ! if (sock > hifd) ! hifd = sock; ! } ! ! rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp); if (rc < 0) { if (errno == EINTR) *************** *** 263,273 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) result = 0; break; } ! if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask)) { result = 2; break; /* data available in socket */ } } waiting = false; --- 274,291 ---- result = 0; break; } ! if (sock != PGINVALID_SOCKET && forRead && ! FD_ISSET(sock, &input_mask)) { result = 2; break; /* data available in socket */ } + if (sock != PGINVALID_SOCKET && forWrite && + FD_ISSET(sock, &output_mask)) + { + result = 3; + break; /* data writable in socket */ + } } waiting = false; *** a/src/backend/port/win32/socket.c --- b/src/backend/port/win32/socket.c *************** *** 14,20 **** #include "postgres.h" /* ! * Indicate if pgwin32_recv() should operate in non-blocking mode. * * Since the socket emulation layer always sets the actual socket to * non-blocking mode in order to be able to deliver signals, we must --- 14,21 ---- #include "postgres.h" /* ! * Indicate if pgwin32_recv() and pgwin32_send() should operate ! * in non-blocking mode. * * Since the socket emulation layer always sets the actual socket to * non-blocking mode in order to be able to deliver signals, we must *************** *** 399,404 **** pgwin32_send(SOCKET s, char *buf, int len, int flags) --- 400,415 ---- return -1; } + if (pgwin32_noblock) + { + /* + * No data sent, and we are in "emulated non-blocking mode", so + * return indicating that we'd block if we were to continue. + */ + errno = EWOULDBLOCK; + return -1; + } + /* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */ if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0) *** a/src/backend/port/win32_latch.c --- b/src/backend/port/win32_latch.c *************** *** 85,95 **** DisownLatch(volatile Latch *latch) bool WaitLatch(volatile Latch *latch, long timeout) { ! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; } int ! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) { DWORD rc; HANDLE events[3]; --- 85,96 ---- bool WaitLatch(volatile Latch *latch, long timeout) { ! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0; } int ! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead, ! bool forWrite, long timeout) { DWORD rc; HANDLE events[3]; *************** *** 103,112 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) events[0] = latchevent; events[1] = pgwin32_signal_event; numevents = 2; ! if (sock != PGINVALID_SOCKET) { sockevent = WSACreateEvent(); ! WSAEventSelect(sock, sockevent, FD_READ); events[numevents++] = sockevent; } --- 104,120 ---- events[0] = latchevent; events[1] = pgwin32_signal_event; numevents = 2; ! if (sock != PGINVALID_SOCKET && (forRead || forWrite)) { + int flags = 0; + + if (forRead) + flags |= FD_READ; + if (forWrite) + flags |= FD_WRITE; + sockevent = WSACreateEvent(); ! WSAEventSelect(sock, sockevent, flags); events[numevents++] = sockevent; } *************** *** 139,146 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) pgwin32_dispatch_queued_signals(); else if (rc == WAIT_OBJECT_0 + 2) { Assert(sock != PGINVALID_SOCKET); ! result = 2; break; } else if (rc != WAIT_OBJECT_0) --- 147,165 ---- pgwin32_dispatch_queued_signals(); else if (rc == WAIT_OBJECT_0 + 2) { + WSANETWORKEVENTS resEvents; + Assert(sock != PGINVALID_SOCKET); ! ! ZeroMemory(&resEvents, sizeof(resEvents)); ! if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR) ! ereport(FATAL, ! (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError()))); ! ! if (forRead && resEvents.lNetworkEvents & FD_READ) ! result = 2; ! if (forWrite && resEvents.lNetworkEvents & FD_WRITE) ! result = 3; break; } else if (rc != WAIT_OBJECT_0) *************** *** 148,154 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) } /* Clean up the handle we created for the socket */ ! if (sock != PGINVALID_SOCKET) { WSAEventSelect(sock, sockevent, 0); WSACloseEvent(sockevent); --- 167,173 ---- } /* Clean up the handle we created for the socket */ ! if (sock != PGINVALID_SOCKET && (forRead || forWrite)) { WSAEventSelect(sock, sockevent, 0); WSACloseEvent(sockevent); *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 74,79 **** bool am_walsender = false; /* Am I a walsender process ? */ --- 74,91 ---- /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int WalSndDelay = 1000; /* max sleep time between some actions */ + int replication_timeout = 0; /* maximum time to send one WAL data message */ + + /* + * Buffer for WAL sending + * + * WalSndOutBuffer is a work area in which the output message is constructed. + * It's used in just so we can avoid re-palloc'ing the buffer on each cycle. + * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. + */ + static char *WalSndOutBuffer; + static int WalSndOutHead; /* head of pending output */ + static int WalSndOutTail; /* tail of pending output */ /* * These variables are used similarly to openLogFile/Id/Seg/Off, *************** *** 95,100 **** static XLogRecPtr sentPtr = {0, 0}; --- 107,117 ---- */ static StringInfoData reply_message; + /* + * Timestamp of the last receipt of the reply from the standby. + */ + static TimestampTz last_reply_timestamp; + /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; volatile sig_atomic_t walsender_shutdown_requested = false; *************** *** 113,119 **** static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); ! static bool XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); static void ProcessStandbyMessage(void); --- 130,136 ---- static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); ! static bool XLogSend(bool *caughtup, bool *pending); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); static void ProcessStandbyMessage(void); *************** *** 467,472 **** ProcessRepliesIfAny(void) --- 484,490 ---- { unsigned char firstchar; int r; + int received = false; for (;;) { *************** *** 479,487 **** ProcessRepliesIfAny(void) errmsg("unexpected EOF on standby connection"))); proc_exit(0); } ! if (r == 0) { ! /* no data available without blocking */ return; } --- 497,510 ---- errmsg("unexpected EOF on standby connection"))); proc_exit(0); } ! if (r == 0) /* no data available without blocking */ { ! /* ! * Save the last reply timestamp if we've received at least ! * one reply. ! */ ! if (received) ! last_reply_timestamp = GetCurrentTimestamp(); return; } *************** *** 493,498 **** ProcessRepliesIfAny(void) --- 516,522 ---- */ case 'd': ProcessStandbyMessage(); + received = true; break; /* *************** *** 669,683 **** ProcessStandbyHSFeedbackMessage(void) static int WalSndLoop(void) { - char *output_message; bool caughtup = false; /* * Allocate buffer that will be used for each output message. We do this * just once to reduce palloc overhead. The buffer must be made large * enough for maximum-sized messages. */ ! output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE); /* * Allocate buffer that will be used for processing reply messages. As --- 693,708 ---- static int WalSndLoop(void) { bool caughtup = false; + bool pending = false; /* * Allocate buffer that will be used for each output message. We do this * just once to reduce palloc overhead. The buffer must be made large * enough for maximum-sized messages. */ ! WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE); ! WalSndOutHead = WalSndOutTail = 0; /* * Allocate buffer that will be used for processing reply messages. As *************** *** 685,690 **** WalSndLoop(void) --- 710,718 ---- */ initStringInfo(&reply_message); + /* Initialize the last reply timestamp */ + last_reply_timestamp = GetCurrentTimestamp(); + /* Loop forever, unless we get an error */ for (;;) { *************** *** 708,717 **** WalSndLoop(void) */ if (walsender_ready_to_stop) { ! if (!XLogSend(output_message, &caughtup)) break; ProcessRepliesIfAny(); ! if (caughtup) walsender_shutdown_requested = true; } --- 736,745 ---- */ if (walsender_ready_to_stop) { ! if (!XLogSend(&caughtup, &pending)) break; ProcessRepliesIfAny(); ! if (caughtup && !pending) walsender_shutdown_requested = true; } *************** *** 726,735 **** WalSndLoop(void) } /* ! * If we had sent all accumulated WAL in last round, nap for the ! * configured time before retrying. */ ! if (caughtup) { /* * Even if we wrote all the WAL that was available when we started --- 754,764 ---- } /* ! * If we had sent all accumulated WAL in last round or could not ! * flush pending WAL in output buffer because the socket was not ! * writable, nap for the configured time before retrying. */ ! if (caughtup || pending) { /* * Even if we wrote all the WAL that was available when we started *************** *** 740,764 **** WalSndLoop(void) */ ResetLatch(&MyWalSnd->latch); ! if (!XLogSend(output_message, &caughtup)) break; ! if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested) { /* * XXX: We don't really need the periodic wakeups anymore, * WaitLatchOrSocket should reliably wake up as soon as * something interesting happens. */ /* Sleep */ WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, ! WalSndDelay * 1000L); } } else { /* Attempt to send the log once every loop */ ! if (!XLogSend(output_message, &caughtup)) break; } --- 769,828 ---- */ ResetLatch(&MyWalSnd->latch); ! if (!XLogSend(&caughtup, &pending)) break; ! if ((caughtup || pending) && !got_SIGHUP && !walsender_ready_to_stop && ! !walsender_shutdown_requested) { + TimestampTz finish_time; + long sleeptime; + /* * XXX: We don't really need the periodic wakeups anymore, * WaitLatchOrSocket should reliably wake up as soon as * something interesting happens. */ + /* Reschedule replication timeout */ + if (replication_timeout > 0) + { + long secs; + int usecs; + + finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + replication_timeout); + TimestampDifference(GetCurrentTimestamp(), + finish_time, &secs, &usecs); + sleeptime = secs * 1000 + usecs / 1000; + if (WalSndDelay < sleeptime) + sleeptime = WalSndDelay; + } + else + sleeptime = WalSndDelay; + /* Sleep */ WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, ! true, (WalSndOutTail > 0), ! sleeptime * 1000L); ! ! /* Check for replication timeout */ ! if (replication_timeout > 0 && GetCurrentTimestamp() >= finish_time) ! { ! /* ! * Since typically expiration of replication timeout means ! * communication problem, we don't send the error message ! * to the standby. ! */ ! ereport(COMMERROR, ! (errmsg("terminating walsender process due to replication timeout"))); ! break; ! } } } else { /* Attempt to send the log once every loop */ ! if (!XLogSend(&caughtup, &pending)) break; } *************** *** 986,1009 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, * but not yet sent to the client, and send it. * - * msgbuf is a work area in which the output message is constructed. It's - * passed in just so we can avoid re-palloc'ing the buffer on each cycle. - * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. - * * If there is no unsent WAL remaining, *caughtup is set to true, otherwise * *caughtup is set to false. * * Returns true if OK, false if trouble. */ static bool ! XLogSend(char *msgbuf, bool *caughtup) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; ! XLogRecPtr endptr; Size nbytes; WalDataMessageHeader msghdr; /* * Attempt to send all data that's already been written out and fsync'd to * disk. We cannot go further than what's been written out given the --- 1050,1097 ---- * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, * but not yet sent to the client, and send it. * * If there is no unsent WAL remaining, *caughtup is set to true, otherwise * *caughtup is set to false. * + * If there is pending WAL in output buffer, *pending is set to true, + * otherwise *pending is set to false. + * * Returns true if OK, false if trouble. */ static bool ! XLogSend(bool *caughtup, bool *pending) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; ! static XLogRecPtr endptr; Size nbytes; + uint32 n32; + int res; WalDataMessageHeader msghdr; + /* Attempt to flush pending WAL in output buffer */ + if (*pending) + { + if (WalSndOutHead != WalSndOutTail) + { + res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead, + WalSndOutTail - WalSndOutHead); + if (res == EOF) + return false; + WalSndOutHead += res; + if (WalSndOutHead != WalSndOutTail) + return true; + } + + res = pq_flush_if_writable(); + if (res == EOF) + return false; + if (res == 0) + return true; + + goto updt; + } + /* * Attempt to send all data that's already been written out and fsync'd to * disk. We cannot go further than what's been written out given the *************** *** 1072,1084 **** XLogSend(char *msgbuf, bool *caughtup) /* * OK to read and send the slice. */ ! msgbuf[0] = 'w'; /* * Read the log directly into the output buffer to avoid extra memcpy * calls. */ ! XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); /* * We fill the message header last so that the send timestamp is taken as --- 1160,1178 ---- /* * OK to read and send the slice. */ ! WalSndOutBuffer[0] = 'd'; ! WalSndOutBuffer[5] = 'w'; ! WalSndOutHead = 0; ! WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes; ! ! n32 = htonl((uint32) WalSndOutTail - 1); ! memcpy(WalSndOutBuffer + 1, &n32, 4); /* * Read the log directly into the output buffer to avoid extra memcpy * calls. */ ! XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes); /* * We fill the message header last so that the send timestamp is taken as *************** *** 1088,1100 **** XLogSend(char *msgbuf, bool *caughtup) msghdr.walEnd = SendRqstPtr; msghdr.sendTime = GetCurrentTimestamp(); ! memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); ! pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); /* Flush pending output to the client */ ! if (pq_flush()) return false; sentPtr = endptr; --- 1182,1215 ---- msghdr.walEnd = SendRqstPtr; msghdr.sendTime = GetCurrentTimestamp(); ! memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader)); ! res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail); ! if (res == EOF) ! return false; ! ! WalSndOutHead = res; ! if (WalSndOutHead != WalSndOutTail) ! { ! *caughtup = false; ! *pending = true; ! return true; ! } /* Flush pending output to the client */ ! res = pq_flush_if_writable(); ! if (res == EOF) return false; + if (res == 0) + { + *caughtup = false; + *pending = true; + return true; + } + + updt: + WalSndOutHead = WalSndOutTail = 0; + *pending = false; sentPtr = endptr; *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 1847,1852 **** static struct config_int ConfigureNamesInt[] = --- 1847,1862 ---- }, { + {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("Sets the maximum time to wait for WAL replication."), + NULL, + GUC_UNIT_MS + }, + &replication_timeout, + 0, 0, INT_MAX, NULL, NULL + }, + + { {"commit_delay", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " "flushing WAL to disk."), *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 191,196 **** --- 191,197 ---- #wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed + #replication_timeout = 0 # in milliseconds, 0 is disabled # - Standby Servers - *** a/src/include/libpq/libpq.h --- b/src/include/libpq/libpq.h *************** *** 59,65 **** extern int pq_getbyte(void); --- 59,67 ---- extern int pq_peekbyte(void); extern int pq_getbyte_if_available(unsigned char *c); extern int pq_putbytes(const char *s, size_t len); + extern int pq_putbytes_if_writable(const char *s, size_t len); extern int pq_flush(void); + extern int pq_flush_if_writable(void); extern int pq_putmessage(char msgtype, const char *s, size_t len); extern void pq_startcopyout(void); extern void pq_endcopyout(bool errorAbort); *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 70,75 **** extern volatile sig_atomic_t walsender_ready_to_stop; --- 70,76 ---- /* user-settable parameters */ extern int WalSndDelay; extern int max_wal_senders; + extern int replication_timeout; extern int WalSenderMain(void); extern void WalSndSignals(void); *** a/src/include/storage/latch.h --- b/src/include/storage/latch.h *************** *** 40,46 **** extern void OwnLatch(volatile Latch *latch); extern void DisownLatch(volatile Latch *latch); extern bool WaitLatch(volatile Latch *latch, long timeout); extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, ! long timeout); extern void SetLatch(volatile Latch *latch); extern void ResetLatch(volatile Latch *latch); #define TestLatch(latch) (((volatile Latch *) latch)->is_set) --- 40,46 ---- extern void DisownLatch(volatile Latch *latch); extern bool WaitLatch(volatile Latch *latch, long timeout); extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, ! bool forRead, bool forWrite, long timeout); extern void SetLatch(volatile Latch *latch); extern void ResetLatch(volatile Latch *latch); #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers