> ProcessRepliesIfAny also now executes in WalSdnWriteData. Because during
> send data we should also check message from client(client can send
> CopyDone, KeepAlive, Terminate).
Ah, I didn't spot that ProcessRepliesIfAny() is already called from
WalSndWriteData in the current codebase.
I agree that it would be useful to check for client-initiated CopyDone
there, clear the walsender write queue, close the decoding session and
return to command mode.
Nothing gets sent until we've formatted the data we want to send into a
StringInfo, though. The comments on WalSndPrepareWrite even note that
there's no guarantee anything will get done with the data. There should be
no need to test whether we're processing a Datum, since we won't call
ProcessRepliesIfAny() or WalSndWriteData() while we're doing that.
I think all you should need to do is test streamingDoneSending and
streamingDoneReceiving in WalSndWriteData() and WalSndWaitForWal().
I outlined how I think WalSndWaitForWal() should be handled earlier.
and streamingDoneSending in logical_read_xlog_page and return -1.
For WalSndWriteData() it's more complicated because there's data waiting to
flush. I don't see any way to remove data from the send buffer in the libpq
async API, so you'll have to send whatever's already been passed to libpq
for sending using pq_putmessage_noblock(...). This is necessary for
protocol integrity too, since you need to send a complete CopyData packet.
The length is specified at the start of the CopyData, so once you start
sending it you are committed to finishing it. Otherwise the client has no
way to know when to stop expecting CopyData and look for the next protocol
message. It can wait until its socket read blocks, but that doesn't mean
there isn't more of the CopyData in the upstream server's send buffers,
buffered in a router, etc.
If you want a way to abort in the middle of sending a datum (or in fact an
individual logical change) you'd need a much bigger change to the
replication protocol, where we chunk up big Datum values into multiple
CopyData messages with continuations. That'd be quite nice to have,
especially if it let us stream the Datum out without assembling the whole
thing in memory in a StringInfo first, but it's a much bigger change.
If you just return from WalSndWriteData(), the data is still queued by
libpq, and libpq will still finish sending it when you try to send
something else. So I don't see any point taking much action in response to
CopyDone in WalSndWriteData(). Maybe you could check if the message we're
about to send is large and if it is, check for client input before we start
sending and bail out before the pq_putmessage_noblock() call. But once
we've done that we're committed to finishing sending that message and have
to wait until pq_is_send_pending() return false before we can send our own
CopyDone and exit.
Since WalSndWriteData() must write a whole message from the decoding plugin
before it can return, and when it returns we'll return
from LogicalDecodingProcessRecord(..) to XLogSendLogical(...), we can just
test for streamingDoneReceiving and streamingDoneSending there.
I guess if you add a flag or callback to the logical decoding context you
could have the logical decoding plugin interrupt processing of an
insert/update/delete row change message between processing of individual
datums within it and return without calling OutputPluginWrite(...) so the
data is never queued for WalSndWriteData(...). This would make a difference
if you have wide rows... but don't currently process client writes during
output plugin callbacks. You'd have to add a logical decoding API function
clients could call to process client replies and set the flag. (They can't
call ProcessRepliesIfAny from walsender.c as it's static and not meant to
be called from within a decoding plugin anyway).
> I want reuse same connection without reopen it, because open new
connection takes too long.
Fair enough. Though I don't understand why you'd be doing this often enough
that you'd care about reopening connections. What is the problem you are
trying to solve with this? The underlying reason you need this change?
> Is it correct use case or CopyDone it side effect of copy protocol and
for complete replication need use always Terminate package and reopen
Client-initiated CopyDone for COPY BOTH should be just fine in protocol
terms. There's partial support for it in the walsender already.
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services