> Fair enough. Though I don't understand why you'd be doing this often > enough that you'd care about reopening connections. What is the problem you > are trying to solve with this? The underlying reason you need this change? >
First reason it clear API in pgjdc. Second reason it ability fast enough rollack to one of the previous LSN and repeat it. My use case for second reason - I have logical decoding extension that prepare only data as key-value pair without information about (insert, update, delete) for example if it delete as key I use primary key for table and as value null. Via pgjdc by replication protocol connects receiver data, consumer group changes to batch and send it to Kafka. If some problem occurs during delivery to kafka consumer, I should stop current replication, go back to success LSN and repeat all messages from success LSN. I think it operation can be quite common, but reopen connection or not stopping replication will increase latency. Anyway, here's a draft patch that does the parts other than the reorder > buffer processing stop. It passes 'make check' and the src/test/recovery > tests, but I haven't written anything to verify the client-initiated abort > handling. You have test code for this and I'd be interested to see the > results. > What about keepAlive package when we already send/receive CopyDone? Is It really necessary? I measure current fix without include long transaction, only start replication and stop it, when on postgresql absent active transactions and get next results: *Before:* 12:35:31.403 (2) FE=> StartReplication(query: START_REPLICATION SLOT pgjdbc_logical_replication_slot LOGICAL 0/7287AC00 ("include-xids" 'false', "skip-empty-xacts" 'true')) 12:35:31.403 (2) FE=> Query(CopyStart) 12:35:31.404 (2) <=BE CopyBothResponse 12:35:31.406 (2) FE=> StopReplication 12:35:31.406 (2) FE=> CopyDone 12:35:31.406 (2) <=BE CopyData 12:35:31.407 (2) [107, 0, 0, 0, 0, 114, -121, -84, 0, 0, 1, -43, 120, 106, 53, -85, 21, 0] 12:35:31.407 (2) <=BE CopyDone 12:35:31.407 (2) <=BE CopyData 12:35:31.407 (2) [107, 0, 0, 0, 0, 114, -121, -84, 0, 0, 1, -43, 120, 106, 53, -76, 29, 0] 12:35:52.120 (2) <=BE CommandStatus(COPY 0) 12:35:52.120 (2) <=BE CommandStatus(SELECT) 12:35:52.120 (2) <=BE ReadyForQuery(I) 12:35:52.126 (2) FE=> Terminate *Timing:* Start and stopping time: 20729ms Stopping time: 20720ms *After:* 14:30:02.050 (2) FE=> StartReplication(query: START_REPLICATION SLOT pgjdbc_logical_replication_slot LOGICAL 0/728A06C0 ("include-xids" 'false', "skip-empty-xacts" 'true')) 14:30:02.050 (2) FE=> Query(CopyStart) 14:30:02.051 (2) <=BE CopyBothResponse 14:30:02.056 (2) FE=> StopReplication 14:30:02.057 (2) FE=> CopyDone 14:30:02.057 (2) <=BE CopyData 14:30:02.057 (2) [107, 0, 0, 0, 0, 114, -118, 6, -64, 0, 1, -43, 122, 3, -69, 107, 76, 0] 14:30:02.058 (2) <=BE CopyDone 14:30:02.058 (2) <=BE CommandStatus(COPY 0) 14:30:02.058 (2) <=BE CommandStatus(SELECT) 14:30:02.058 (2) <=BE ReadyForQuery(I) 14:30:02.068 (2) FE=> StandbyStatusUpdate(received: 0/728A06C0, flushed: 0/0, applied: 0/0, clock: Tue May 10 14:30:02 MSK 2016) *Timing:* Start and stopping time: 27ms Stopping time: 11ms So aborting processing and returning between individual changes in > ReorderBufferCommit(...) seems very reasonable. I agree that some kind of > callback is needed because the walsender uses static globals to control its > send/receive stop logic. I don't like the naming "is_active"; maybe reverse > the sense and call it "stop_decoding_cb" ? Or "continue_decoding_cb"? > Unsure. continue_decoding_cb sounds good. I think it's worth making the next step, where you allow reorder buffer > commit processing to be interrupted, into a separate patch on top of this > one. They're two separate changes IMO. > We will continue in the current thread, or new? I interesting in both patch for my solution and pgjbc driver. 2016-05-10 5:49 GMT+03:00 Craig Ringer <cr...@2ndquadrant.com>: > On 10 May 2016 at 09:50, Craig Ringer <cr...@2ndquadrant.com> wrote: > > >> I outlined how I think WalSndWaitForWal() should be handled earlier. >> Test streamingDoneReceiving and streamingDoneSending in >> logical_read_xlog_page >> and return -1. >> > > OK, so thinking about this some more, I see why you've added the callback > within the reorder buffer code. You want to stop processing of a > transaction after we've decoded the commit and are looping over the changes > within ReorderBufferCommit(...), which doesn't know anything about the > walsender. So we could loop for a long time within WalSndLoop -> > XLogSendLogical -> LogicalDecodingProcessRecord if the record is a commit, > as we process each change and send it to the client. > > So aborting processing and returning between individual changes in > ReorderBufferCommit(...) seems very reasonable. I agree that some kind of > callback is needed because the walsender uses static globals to control its > send/receive stop logic. I don't like the naming "is_active"; maybe reverse > the sense and call it "stop_decoding_cb" ? Or "continue_decoding_cb"? > Unsure. > > > Anyway, here's a draft patch that does the parts other than the reorder > buffer processing stop. It passes 'make check' and the src/test/recovery > tests, but I haven't written anything to verify the client-initiated abort > handling. You have test code for this and I'd be interested to see the > results. > > This patch doesn't attempt to allow decoding to be aborted during > processing of an xlog record, including a commit. So it'll still attempt to > send whole transactions. But it should allow clients to send CopyDone when > we're waiting for new WAL to be generated and return to command mode then. > > I think it's worth making the next step, where you allow reorder buffer > commit processing to be interrupted, into a separate patch on top of this > one. They're two separate changes IMO. > > -- > Craig Ringer http://www.2ndQuadrant.com/ > PostgreSQL Development, 24x7 Support, Training & Services >