On 15 January 2016 at 16:30, Shulgin, Oleksandr <
oleksandr.shul...@zalando.de> wrote:

> I'd like to propose generic functions (probably in an extension, or in
> core if not possible otherwise) to facilitate streaming existing data from
> the database *in the same format* that one would get if these would be the
> changes decoded by a logical decoding plugin.

So effectively produce synthetic logical decoding callbacks to run a bunch
of fake INSERTs, presumably with a fake xid etc?

> The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
> command of the replication protocol to get a consistent snapshot of the
> database, then start listening to new changes on the slot.

My impression is that you want to avoid the current step of "synchronize
database initial contents" when using logical decoding for replication. But
I guess you're looking to then populate that empty schema in-band via
logical decoding, rather than having to do a --data-only dump or use COPY.

That won't help you for schema; presumably you'd still do a pg_dump
--schema-only | pg_restore for that.

Just like when restoring a --data-only dump or using COPY you'd have to
disable FKs during sync, but that's pretty much unavoidable.

> The way this initial export phase is implemented there is by providing a
> SQL-callable set returning function which is using SPI to run "SELECT *
> FROM mytable" behind the scenes and runs the resulting tuples through the
> INSERT callback of the logical decoding plugin, which lives in the same
> loadable module as this SQL function.


What about the reorder buffer, the logical decoding memory context, etc?

> Bottled Water logical decoding plugin uses binary protocol based on Avro
> data serialization library.  As an experiment I was adding support for JSON
> output format to it, and for that I had to re-implement the aforementioned
> SRF to export initial data to convert tuples to JSON instead.

Have you taken a look at what's been done with pglogical and

We've got extensible protocol support there, and if Avro offers compelling
benefits over the current binary serialization I'm certainly interested in
hearing about it.

> What do you say?

Interesting idea. As outlined I think it sounds pretty fragile though; I
really, really don't like the idea of lying to the insert callback by
passing it a fake insert with (presumably) fake reorder buffer txn, etc.

What we've done in pglogical is take a --schema-only dump then, on the
downstream, attach to the exported snapshot and use COPY ... TO STDOUT over
a libpq connection to the upstream feed that to COPY ... FROM STDIN on
another libpq connection to "ourselves", i.e. the downstream. Unless Petr
changed it to use COPY innards directly on the downstream; I know he talked
about it but haven't checked if he did. Anyway, either way it's not pretty
and requires a sideband non-replication connection to sync initial state.
The upside is that it can be relatively easily parallelized for faster sync
using multiple connections.

To what extent are you setting up a true logical decoding context here?
Where does the xact info come from? The commit record? etc. You're
presumably not forming a reorder buffer then decoding it since it could
create a massive tempfile on disk, so are you just dummying this info up?
Or hoping the plugin won't look at it?

The functionality is good and I think that for the SQL level you'd have to
use SET TRANSACTION SNAPSHOT as you show. But I think it should really be
usable from the replication protocol too - and should try to keep the state
as close to that of a normal decoding session as possible. We'd at least
need a new walsender protocol command equivalent that took the snapshot
identifier, relation info and the other decoding params instead of a slot
name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that
omits SLOT and instead takes TABLES as an argument, with a list of
relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd
return to walsender protocol mode on completion, like the phys rep protocol
does when it's time for a timeline switch.

Rather than lie to the insert callback I'd really rather define a new
logical decoding callback for copying initial records. It doesn't get any
xact info (since it's not useful/relevant) or a full reorder buffer. No
ReorderBufferChange is passed; instead we pass something like a
ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin
id, origin lsn and commit timestamp (if known) and the RelFileNode
affected. The LogicalDecodingContext that's set up for the callback gets
ctx->reorder = NULL .  There's no ReorderBufferTxn argument and none is

Since it's a new callback the plugin knows the rules, knows it's getting
initial state data to sync over, etc. It doesn't have to try to guess if
it's seeing a real insert and act differently with respect to xact identity

Obviously that's 9.6 material at the soonest, and only 9.6 if it could be
done ... well, right about now. So that won't meet your immediate needs,
but I think the same is true of the interface you propose above.

What I suggest doing in the mean time is specifying a new callback function
interface for tuple copies as described above, to be implemented by logical
decoding modules that support this extension. In each decoding plugin we
then define a SQL-callable function with 'internal' return type that
returns a pointer to the callback so you can obtain the hook function
address via a fmgr call via pg_proc. The callback would expect a state much
like I describe above and we'd use a SQL-callable function like what you
outlined to set up a fake logical decoding state for it, complete with
decoding context etc. Probably copying & pasting a moderately painful
amount of the logical decoding guts into an ext in the process :( since I
don't think you can easily set up much of the decoding state using the
decoding backend code without having a slot to use. Still, that'd let us
prototype this and prove the idea for inclusion in 9.7 (?) in-core while
retaining the capability via an extension for earlier versions.

You'd have to do much of the same hoop jumping to call an arbitrary output
plugin's insert callback directly, if not more.

Alternately, you could just use COPY ;)

 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

