On 30/05/17 15:44, Petr Jelinek wrote: > On 30/05/17 11:02, Kyotaro HORIGUCHI wrote: >> >> + TimestampTz now = GetCurrentTimestamp(); >> >> I think it is not recommended to read the current time too >> frequently, especially within a loop that hates slowness. (I >> suppose that a loop that can fill up a send queue falls into that > > Yeah that was my main worry for the patch as well, although given that > the loop does tuple processing it might not be very noticeable. >
I realized we actually call GetCurrentTimestamp() there anyway (for the pq_sendint64). So I just modified the patch to use the now variable there instead which means there are no additional GetCurrentTimestamp() calls compared to state before patch now. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
>From c6e9cdb81311371662c580db42a165ab0575d951 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Thu, 25 May 2017 17:35:24 +0200 Subject: [PATCH] Fix walsender timeouts when decoding large transaction The logical slots have fast code path for sending data in order to not impose too high per message overhead. The fast path skips checks for interrupts and timeouts. However the fast path failed to consider the fact that transaction with large number of changes may take very long to be processed and sent to the client. This causes walsender to ignore interrupts for potentially long time but more importantly it will cause walsender being killed due to timeout at the end of such transaction. This commit changes the fast path to also check for interrupts and only allows calling the fast path when last keeplaive check happened less than half of walsender timeout ago, otherwise the slower code path will be taken. --- src/backend/replication/walsender.c | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f845180..278f882 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1160,6 +1160,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) { + TimestampTz now = GetCurrentTimestamp(); + /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); @@ -1169,23 +1171,28 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); - pq_sendint64(&tmpbuf, GetCurrentTimestamp()); + pq_sendint64(&tmpbuf, now); memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - WalSndShutdown(); + /* Try taking fast path unless we get too close to walsender timeout. */ + if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2)) + { + CHECK_FOR_INTERRUPTS(); - if (!pq_is_send_pending()) - return; + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + if (!pq_is_send_pending()) + return; + } for (;;) { int wakeEvents; long sleeptime; - TimestampTz now; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1214,18 +1221,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, if (pq_flush_if_writable() != 0) WalSndShutdown(); - /* If we finished clearing the buffered data, we're done here. */ - if (!pq_is_send_pending()) - break; - - now = GetCurrentTimestamp(); - /* die if timeout was reached */ WalSndCheckTimeOut(now); /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(now); + /* If we finished clearing the buffered data, we're done here. */ + if (!pq_is_send_pending()) + break; + sleeptime = WalSndComputeSleeptime(now); wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | @@ -1235,6 +1240,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime, WAIT_EVENT_WAL_SENDER_WRITE_DATA); + + /* Update our idea of current time for the next cycle. */ + now = GetCurrentTimestamp(); } /* reactivate latch so WalSndLoop knows to continue */ -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers