Hi,

On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:
> From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
> From: Dave Cramer <davecra...@gmail.com>
> Date: Fri, 30 Nov 2018 18:23:49 -0500
> Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender
> 
> ---
>  src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
>  1 file changed, 30 insertions(+), 6 deletions(-)
> 
> diff --git a/src/backend/replication/walsender.c 
> b/src/backend/replication/walsender.c
> index 46edb52..93f2648 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, 
> XLogRecPtr targetPagePtr, int req
>       sendTimeLineValidUpto = state->currTLIValidUntil;
>       sendTimeLineNextTLI = state->nextTLI;
>  
> +     /*
> +     * If the client sent CopyDone while we were waiting,
> +     * bail out so we can wind up the decoding session.
> +     */
> +     if (streamingDoneSending)
> +             return -1;
> +
> +      /* more than one block available */
>       /* make sure we have enough WAL available */
>       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
>  
> @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
>                * It's important to do this check after the recomputation of
>                * RecentFlushPtr, so we can send all remaining data before 
> shutting
>                * down.
> -              */
> -             if (got_STOPPING)
> +              *
> +              * We'll also exit here if the client sent CopyDone because it 
> wants
> +              * to return to command mode.
> +             */
> +
> +             if (got_STOPPING || streamingDoneReceiving)
>                       break;
>  
>               /*
> @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
>       }
>  }
>  
> -/* Main loop of walsender process that streams the WAL over Copy messages. */
> +/*
> + * Main loop of walsender process that streams the WAL over Copy messages.
> + *
> + * The send_data callback must enqueue complete CopyData messages to libpq
> + * using pq_putmessage_noblock or similar, since the walsender loop may send
> + * CopyDone then exit and return to command mode in response to a client
> + * CopyDone between calls to send_data.
> + */

Wait, how is it ok to end CopyDone before all the pending data has been
sent out?



> diff --git a/src/backend/replication/logical/reorderbuffer.c 
> b/src/backend/replication/logical/reorderbuffer.c
> index 23466ba..66b6e90 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId 
> xid,
>               rb->begin(rb, txn);
>  
>               iterstate = ReorderBufferIterTXNInit(rb, txn);
> -             while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != 
> NULL)
> +             while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != 
> NULL &&
> +                        (rb->continue_decoding_cb == NULL ||
> +                             rb->continue_decoding_cb()))
>               {
>                       Relation        relation = NULL;
>                       Oid                     reloid;

> @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId 
> xid,
>               ReorderBufferIterTXNFinish(rb, iterstate);
>               iterstate = NULL;
>  
> -             /* call commit callback */
> -             rb->commit(rb, txn, commit_lsn);
> +             if (rb->continue_decoding_cb == NULL || 
> rb->continue_decoding_cb())
> +             {
> +                     /* call commit callback */
> +                     rb->commit(rb, txn, commit_lsn);
> +             }


I'm doubtful it's ok to simply stop in the middle of a transaction.



> @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, 
> XLogRecPtr lsn, TransactionId xid,
>  
>       CHECK_FOR_INTERRUPTS();
>  
> -     /* Try to flush pending output to the client */
> -     if (pq_flush_if_writable() != 0)
> -             WalSndShutdown();
> -
> -     /* Try taking fast path unless we get too close to walsender timeout. */
> -     if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
> -                                                                             
>   wal_sender_timeout / 2) &&
> -             !pq_is_send_pending())
> -     {
> -             return;
> -     }

As somebody else commented on the thread, I'm also doubtful this is
ok. This'll introduce significant additional blocking unless I'm missing
something?



>       /* If we have pending write here, go to slow path */
>       for (;;)
> @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
>                       break;
>  
>               /*
> -              * We only send regular messages to the client for full decoded
> +              * If we have received CopyDone from the client, sent CopyDone
> +              * ourselves, it's time to exit streaming.
> +              */
> +             if (!IsStreamingActive()) {
> +                     break;
> +             }

Wrong formatting.


I wonder if the saner approach here isn't to support query cancellations
or something of that vein, and then handle the error.

Greetings,

Andres Freund

Reply via email to