On Wed, Jan 20, 2016 at 7:57 AM, Craig Ringer <cr...@2ndquadrant.com> wrote:

> On 15 January 2016 at 16:30, Shulgin, Oleksandr <
> oleksandr.shul...@zalando.de> wrote:
>> I'd like to propose generic functions (probably in an extension, or in
>> core if not possible otherwise) to facilitate streaming existing data from
>> the database *in the same format* that one would get if these would be the
>> changes decoded by a logical decoding plugin.
> So effectively produce synthetic logical decoding callbacks to run a bunch
> of fake INSERTs, presumably with a fake xid etc?


> The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
>> command of the replication protocol to get a consistent snapshot of the
>> database, then start listening to new changes on the slot.
> My impression is that you want to avoid the current step of "synchronize
> database initial contents" when using logical decoding for replication.

Yes, but...

> But I guess you're looking to then populate that empty schema in-band via
> logical decoding, rather than having to do a --data-only dump or use COPY.
> Right?
> That won't help you for schema; presumably you'd still do a pg_dump
> --schema-only | pg_restore for that.
> Just like when restoring a --data-only dump or using COPY you'd have to
> disable FKs during sync, but that's pretty much unavoidable.

All of this implies another *postgres* database on the receiving side,
which is not necessarily the case for my research.

The way this initial export phase is implemented there is by providing a
>> SQL-callable set returning function which is using SPI to run "SELECT *
>> FROM mytable" behind the scenes and runs the resulting tuples through the
>> INSERT callback of the logical decoding plugin, which lives in the same
>> loadable module as this SQL function.
> o_O
> What about the reorder buffer, the logical decoding memory context, etc?

As shown by the POC patch, it is rather straightforward to achieve.

Bottled Water logical decoding plugin uses binary protocol based on Avro
>> data serialization library.  As an experiment I was adding support for JSON
>> output format to it, and for that I had to re-implement the aforementioned
>> SRF to export initial data to convert tuples to JSON instead.
> Have you taken a look at what's been done with pglogical and
> pglogical_output?
> We've got extensible protocol support there, and if Avro offers compelling
> benefits over the current binary serialization I'm certainly interested in
> hearing about it.

This is what I'm going to benchmark.  With the generic function I can just
create two slots: one for pglogical and another one for BottledWater/Avro
and see which one performs better when forced to stream some TB worth of
INSERTs through the change callback.

What do you say?
> Interesting idea. As outlined I think it sounds pretty fragile though; I
> really, really don't like the idea of lying to the insert callback by
> passing it a fake insert with (presumably) fake reorder buffer txn, etc.

Fair enough.  However for performance testing it could be not that bad,
even if nothing of that lands in the actual API.

What we've done in pglogical is take a --schema-only dump then, on the
> downstream, attach to the exported snapshot and use COPY ... TO STDOUT over
> a libpq connection to the upstream feed that to COPY ... FROM STDIN on
> another libpq connection to "ourselves", i.e. the downstream. Unless Petr
> changed it to use COPY innards directly on the downstream; I know he talked
> about it but haven't checked if he did. Anyway, either way it's not pretty
> and requires a sideband non-replication connection to sync initial state.
> The upside is that it can be relatively easily parallelized for faster sync
> using multiple connections.

I've also measured that to have a baseline for comparing it to decoding

To what extent are you setting up a true logical decoding context here?

It is done in the same way exact pg_logical_slot_get/peek_changes() do.

> Where does the xact info come from? The commit record? etc.


> You're presumably not forming a reorder buffer then decoding it since it
> could create a massive tempfile on disk, so are you just dummying this info
> up?

In my experience, it doesn't.  We know it's going to be a "committed xact",
so we don't really need to queue the changes up before we see a "commit"

> Or hoping the plugin won't look at it?

Pretty much. :-)

The functionality is good and I think that for the SQL level you'd have to
> use SET TRANSACTION SNAPSHOT as you show. But I think it should really be
> usable from the replication protocol too - and should try to keep the state
> as close to that of a normal decoding session as possible. We'd at least
> need a new walsender protocol command equivalent that took the snapshot
> identifier, relation info and the other decoding params instead of a slot
> name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that
> omits SLOT and instead takes TABLES as an argument, with a list of
> relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd
> return to walsender protocol mode on completion, like the phys rep protocol
> does when it's time for a timeline switch.

I've had similar thoughts.

Another consideration is that we might introduce modes for acquiring the
slot: Exclusive and Shared access (can be implemented with LWLocks?), so
that peek_changes() and stream_relation() could acquire the slot in Shared
access mode, thus allowing parallel queries, while START_REPLICATION and
get_changes() would require Exclusive access.

Rather than lie to the insert callback I'd really rather define a new
> logical decoding callback for copying initial records. It doesn't get any
> xact info (since it's not useful/relevant) or a full reorder buffer. No
> ReorderBufferChange is passed; instead we pass something like a
> ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin
> id, origin lsn and commit timestamp (if known) and the RelFileNode
> affected. The LogicalDecodingContext that's set up for the callback gets
> ctx->reorder = NULL .  There's no ReorderBufferTxn argument and none is
> defined.
> Since it's a new callback the plugin knows the rules, knows it's getting
> initial state data to sync over, etc. It doesn't have to try to guess if
> it's seeing a real insert and act differently with respect to xact identity
> etc.
> Obviously that's 9.6 material at the soonest, and only 9.6 if it could be
> done ... well, right about now. So that won't meet your immediate needs,
> but I think the same is true of the interface you propose above.

That can be a good approach going forward, yes.

What I suggest doing in the mean time is specifying a new callback function
> interface for tuple copies as described above, to be implemented by logical
> decoding modules that support this extension. In each decoding plugin we
> then define a SQL-callable function with 'internal' return type that
> returns a pointer to the callback so you can obtain the hook function
> address via a fmgr call via pg_proc. The callback would expect a state much
> like I describe above and we'd use a SQL-callable function like what you
> outlined to set up a fake logical decoding state for it, complete with
> decoding context etc. Probably copying & pasting a moderately painful
> amount of the logical decoding guts into an ext in the process :( since I
> don't think you can easily set up much of the decoding state using the
> decoding backend code without having a slot to use. Still, that'd let us
> prototype this and prove the idea for inclusion in 9.7 (?) in-core while
> retaining the capability via an extension for earlier versions.
> You'd have to do much of the same hoop jumping to call an arbitrary output
> plugin's insert callback directly, if not more.
> Alternately, you could just use COPY ;)

Thanks for the thoughtful reply!  I'm going to experiment with my toy code
a bit more, while keeping in mind what could a more workable approach look


Reply via email to