> > ProcessRepliesIfAny also now executes in WalSdnWriteData. Because during > send data we should also check message from client(client can send > CopyDone, KeepAlive, Terminate). > > Ah, I didn't spot that ProcessRepliesIfAny() is already called from WalSndWriteData in the current codebase.
I agree that it would be useful to check for client-initiated CopyDone there, clear the walsender write queue, close the decoding session and return to command mode. Nothing gets sent until we've formatted the data we want to send into a StringInfo, though. The comments on WalSndPrepareWrite even note that there's no guarantee anything will get done with the data. There should be no need to test whether we're processing a Datum, since we won't call ProcessRepliesIfAny() or WalSndWriteData() while we're doing that. I think all you should need to do is test streamingDoneSending and streamingDoneReceiving in WalSndWriteData() and WalSndWaitForWal(). I outlined how I think WalSndWaitForWal() should be handled earlier. Test streamingDoneReceiving and streamingDoneSending in logical_read_xlog_page and return -1. For WalSndWriteData() it's more complicated because there's data waiting to flush. I don't see any way to remove data from the send buffer in the libpq async API, so you'll have to send whatever's already been passed to libpq for sending using pq_putmessage_noblock(...). This is necessary for protocol integrity too, since you need to send a complete CopyData packet. The length is specified at the start of the CopyData, so once you start sending it you are committed to finishing it. Otherwise the client has no way to know when to stop expecting CopyData and look for the next protocol message. It can wait until its socket read blocks, but that doesn't mean there isn't more of the CopyData in the upstream server's send buffers, buffered in a router, etc. If you want a way to abort in the middle of sending a datum (or in fact an individual logical change) you'd need a much bigger change to the replication protocol, where we chunk up big Datum values into multiple CopyData messages with continuations. That'd be quite nice to have, especially if it let us stream the Datum out without assembling the whole thing in memory in a StringInfo first, but it's a much bigger change. If you just return from WalSndWriteData(), the data is still queued by libpq, and libpq will still finish sending it when you try to send something else. So I don't see any point taking much action in response to CopyDone in WalSndWriteData(). Maybe you could check if the message we're about to send is large and if it is, check for client input before we start sending and bail out before the pq_putmessage_noblock() call. But once we've done that we're committed to finishing sending that message and have to wait until pq_is_send_pending() return false before we can send our own CopyDone and exit. Since WalSndWriteData() must write a whole message from the decoding plugin before it can return, and when it returns we'll return from LogicalDecodingProcessRecord(..) to XLogSendLogical(...), we can just test for streamingDoneReceiving and streamingDoneSending there. I guess if you add a flag or callback to the logical decoding context you could have the logical decoding plugin interrupt processing of an insert/update/delete row change message between processing of individual datums within it and return without calling OutputPluginWrite(...) so the data is never queued for WalSndWriteData(...). This would make a difference if you have wide rows... but don't currently process client writes during output plugin callbacks. You'd have to add a logical decoding API function clients could call to process client replies and set the flag. (They can't call ProcessRepliesIfAny from walsender.c as it's static and not meant to be called from within a decoding plugin anyway). > I want reuse same connection without reopen it, because open new connection takes too long. Fair enough. Though I don't understand why you'd be doing this often enough that you'd care about reopening connections. What is the problem you are trying to solve with this? The underlying reason you need this change? > Is it correct use case or CopyDone it side effect of copy protocol and for complete replication need use always Terminate package and reopen connection? Client-initiated CopyDone for COPY BOTH should be just fine in protocol terms. There's partial support for it in the walsender already. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services