Hi, On 2018-12-03 06:38:43 -0500, Dave Cramer wrote: > From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001 > From: Dave Cramer <davecra...@gmail.com> > Date: Fri, 30 Nov 2018 18:23:49 -0500 > Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender > > --- > src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------ > 1 file changed, 30 insertions(+), 6 deletions(-) > > diff --git a/src/backend/replication/walsender.c > b/src/backend/replication/walsender.c > index 46edb52..93f2648 100644 > --- a/src/backend/replication/walsender.c > +++ b/src/backend/replication/walsender.c > @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, > XLogRecPtr targetPagePtr, int req > sendTimeLineValidUpto = state->currTLIValidUntil; > sendTimeLineNextTLI = state->nextTLI; > > + /* > + * If the client sent CopyDone while we were waiting, > + * bail out so we can wind up the decoding session. > + */ > + if (streamingDoneSending) > + return -1; > + > + /* more than one block available */ > /* make sure we have enough WAL available */ > flushptr = WalSndWaitForWal(targetPagePtr + reqLen); > > @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc) > * It's important to do this check after the recomputation of > * RecentFlushPtr, so we can send all remaining data before > shutting > * down. > - */ > - if (got_STOPPING) > + * > + * We'll also exit here if the client sent CopyDone because it > wants > + * to return to command mode. > + */ > + > + if (got_STOPPING || streamingDoneReceiving) > break; > > /* > @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void) > } > } > > -/* Main loop of walsender process that streams the WAL over Copy messages. */ > +/* > + * Main loop of walsender process that streams the WAL over Copy messages. > + * > + * The send_data callback must enqueue complete CopyData messages to libpq > + * using pq_putmessage_noblock or similar, since the walsender loop may send > + * CopyDone then exit and return to command mode in response to a client > + * CopyDone between calls to send_data. > + */
Wait, how is it ok to end CopyDone before all the pending data has been sent out? > diff --git a/src/backend/replication/logical/reorderbuffer.c > b/src/backend/replication/logical/reorderbuffer.c > index 23466ba..66b6e90 100644 > --- a/src/backend/replication/logical/reorderbuffer.c > +++ b/src/backend/replication/logical/reorderbuffer.c > @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId > xid, > rb->begin(rb, txn); > > iterstate = ReorderBufferIterTXNInit(rb, txn); > - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != > NULL) > + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != > NULL && > + (rb->continue_decoding_cb == NULL || > + rb->continue_decoding_cb())) > { > Relation relation = NULL; > Oid reloid; > @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId > xid, > ReorderBufferIterTXNFinish(rb, iterstate); > iterstate = NULL; > > - /* call commit callback */ > - rb->commit(rb, txn, commit_lsn); > + if (rb->continue_decoding_cb == NULL || > rb->continue_decoding_cb()) > + { > + /* call commit callback */ > + rb->commit(rb, txn, commit_lsn); > + } I'm doubtful it's ok to simply stop in the middle of a transaction. > @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, > XLogRecPtr lsn, TransactionId xid, > > CHECK_FOR_INTERRUPTS(); > > - /* Try to flush pending output to the client */ > - if (pq_flush_if_writable() != 0) > - WalSndShutdown(); > - > - /* Try taking fast path unless we get too close to walsender timeout. */ > - if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, > - > wal_sender_timeout / 2) && > - !pq_is_send_pending()) > - { > - return; > - } As somebody else commented on the thread, I'm also doubtful this is ok. This'll introduce significant additional blocking unless I'm missing something? > /* If we have pending write here, go to slow path */ > for (;;) > @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc) > break; > > /* > - * We only send regular messages to the client for full decoded > + * If we have received CopyDone from the client, sent CopyDone > + * ourselves, it's time to exit streaming. > + */ > + if (!IsStreamingActive()) { > + break; > + } Wrong formatting. I wonder if the saner approach here isn't to support query cancellations or something of that vein, and then handle the error. Greetings, Andres Freund