On 08/01/18 08:02, Simon Riggs wrote: > On 31 December 2017 at 10:44, Petr Jelinek <petr.jeli...@2ndquadrant.com> > wrote: > >> Attached is patch which adds ability to do fast-forwarding while >> decoding. That means wal is consumed as fast as possible and changes are >> not given to output plugin for sending. The implementation is less >> invasive than I originally though it would be. Most of it is just >> additional filter condition in places where we would normally filter out >> changes because we don't yet have full snapshot. > > Looks good. >
Thanks. > The precise definition of "slot advance" or "fast forward" isn't > documented in the patch. If we advance past everything, why is there > not just one test in LogicalDecodingProcessRecord() to say if > (ctx->fast_forward)? Why put it in multiple decoding subroutines? > Because we still need to track transactions (otherwise slot's restart position will not move forward) and mark transactions which did DDL changes so that historical snapshots are made. Otherwise if we moved slot forward and then started real decoding from that position we'd have wrong view of catalogs. We'd have to write different version of LogicalDecodingProcessRecord() and duplicate some of the code in the Decode* functions there which seems like it would be harder to maintain. I might be inclined to do it with this approach if the current approach would mean adding new branch into every Decode* function, but since there is already branch for filtering actions during initial snapshot build, I think it's better to just extend that. > If ctx->fast_forward is set it might throw off other opps, so it would > be useful to see some Asserts elsewhere to make sure we understand and > avoid breakage Hmm, I think the really only places where this can be issue and also can be checked using Assert are the cb wrappers in logical.c which call the output plugin (output plugin is not supposed to be called when fast-forwarding) so I Added assert to each of them. > In pg_replication_slot_advance() the moveto variable is set to > PG_GETARG_LSN(1) and then unconditionally overwritten before it is > used for anything. Why? > Eh, there is missing Min, it should be used for clamping, not done unconditionally. Fixed and added regression test for this. Updated version attached. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 00b7a3e4abcba36e326dcb4d0c388d44446e43c2 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Fri, 29 Dec 2017 09:46:35 +0100 Subject: [PATCH] Add pg_replication_slot_advance() function Advances replication slot to specified position. Works both with logical and physical slots. This also adds fast forward mode for logical decoding which is used by the pg_replication_slot_advance() function as well as slot creation. --- contrib/test_decoding/expected/slot.out | 30 ++++ contrib/test_decoding/sql/slot.sql | 15 ++ doc/src/sgml/func.sgml | 19 +++ src/backend/replication/logical/decode.c | 41 +++-- src/backend/replication/logical/logical.c | 30 +++- src/backend/replication/logical/logicalfuncs.c | 1 + src/backend/replication/slotfuncs.c | 200 +++++++++++++++++++++++++ src/backend/replication/walsender.c | 1 + src/include/catalog/pg_proc.h | 2 + src/include/replication/logical.h | 8 + 10 files changed, 330 insertions(+), 17 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 9f5f8a9b76..21e9d56f73 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -92,6 +92,36 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'in COMMIT (3 rows) +INSERT INTO replication_example(somedata, text) VALUES (1, 4); +INSERT INTO replication_example(somedata, text) VALUES (1, 5); +SELECT pg_current_wal_lsn() AS wal_lsn \gset +INSERT INTO replication_example(somedata, text) VALUES (1, 6); +SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset +SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); + slot_name +------------------ + regression_slot2 +(1 row) + +SELECT :'wal_lsn' = :'end_lsn'; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------------------------------- + BEGIN + table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6' + COMMIT +(3 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + DROP TABLE replication_example; -- error SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index fa9561f54e..706340c1d8 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -45,6 +45,21 @@ INSERT INTO replication_example(somedata, text) VALUES (1, 3); SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +INSERT INTO replication_example(somedata, text) VALUES (1, 4); +INSERT INTO replication_example(somedata, text) VALUES (1, 5); + +SELECT pg_current_wal_lsn() AS wal_lsn \gset + +INSERT INTO replication_example(somedata, text) VALUES (1, 6); + +SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset +SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); + +SELECT :'wal_lsn' = :'end_lsn'; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + DROP TABLE replication_example; -- error diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 2428434030..487c7ff750 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -19155,6 +19155,25 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); </entry> </row> + <row> + <entry> + <indexterm> + <primary>pg_replication_slot_advance</primary> + </indexterm> + <literal><function>pg_replication_slot_advance(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>)</function></literal> + </entry> + <entry> + (<parameter>slot_name</parameter> <type>name</type>, <parameter>end_lsn</parameter> <type>pg_lsn</type>) + <type>bool</type> + </entry> + <entry> + Advances the current confirmed position of a replication slot named + <parameter>slot_name</parameter>. The slot will not be moved backwards, + and it will not be moved beyond the current insert location. Returns + name of the slot and real position to which it was advanced to. + </entry> + </row> + <row> <entry id="pg-replication-origin-create"> <indexterm> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 537eba7875..bf02e1cbc6 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -332,8 +332,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_invalidations *invalidations = (xl_invalidations *) XLogRecGetData(r); - ReorderBufferImmediateInvalidation( - ctx->reorder, invalidations->nmsgs, invalidations->msgs); + if (!ctx->fast_forward) + ReorderBufferImmediateInvalidation(ctx->reorder, + invalidations->nmsgs, + invalidations->msgs); } break; default: @@ -353,14 +355,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); - /* no point in doing anything yet */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (!ctx->fast_forward && + SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: @@ -408,8 +415,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); - /* no point in doing anything yet */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; switch (info) @@ -501,8 +512,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - /* No point in doing anything yet. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; message = (xl_logical_message *) XLogRecGetData(r); @@ -554,8 +569,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (parsed->nmsgs > 0) { - ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - parsed->nmsgs, parsed->msgs); + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } @@ -574,6 +590,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if @@ -589,7 +606,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || - FilterByOrigin(ctx, origin_id)) + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2fc9d7d70f..7637efc32e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) output plugin * now. */ - LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); + if (!fast_forward) + LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); /* * Now that the slot's xmin has been set, we can announce ourselves as a @@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->output_plugin_options = output_plugin_options; + ctx->fast_forward = fast_forward; + MemoryContextSwitchTo(old_context); return ctx; @@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - need_full_snapshot, read_page, prepare_write, - do_write, update_progress); + need_full_snapshot, true, + read_page, prepare_write, do_write, + update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin, LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - read_page, prepare_write, do_write, - update_progress); + fast_forward, read_page, prepare_write, + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "startup"; @@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "shutdown"; @@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "begin"; @@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "commit"; @@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "change"; @@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) ErrorContextCallback errcallback; bool ret; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "filter_by_origin"; @@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + if (ctx->callbacks.message_cb == NULL) return; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 9aab6e71b2..54c25f1f5b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -251,6 +251,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* restart at slot's confirmed_flush */ ctx = CreateDecodingContext(InvalidXLogRecPtr, options, + false, logical_read_local_xlog_page, LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b02df593e9..93d2e20f76 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -17,11 +17,14 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/pg_lsn.h" +#include "utils/resowner.h" static void check_permissions(void) @@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Helper function for advancing physical replication slot forward. + */ +static XLogRecPtr +pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +{ + XLogRecPtr retlsn = InvalidXLogRecPtr; + + SpinLockAcquire(&MyReplicationSlot->mutex); + if (MyReplicationSlot->data.restart_lsn < moveto) + { + MyReplicationSlot->data.restart_lsn = moveto; + retlsn = moveto; + } + SpinLockRelease(&MyReplicationSlot->mutex); + + return retlsn; +} + +/* + * Helper function for advancing logical replication slot forward. + */ +static XLogRecPtr +pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +{ + LogicalDecodingContext *ctx; + ResourceOwner old_resowner = CurrentResourceOwner; + XLogRecPtr retlsn = InvalidXLogRecPtr; + + PG_TRY(); + { + /* restart at slot's confirmed_flush */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, + logical_read_local_xlog_page, + NULL, NULL, NULL); + + CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, + "logical decoding"); + + /* invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Decode until we run out of records */ + while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || + (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + { + XLogRecord *record; + char *errm = NULL; + + record = XLogReadRecord(ctx->reader, startlsn, &errm); + if (errm) + elog(ERROR, "%s", errm); + + /* + * Now that we've set up the xlog reader state, subsequent calls + * pass InvalidXLogRecPtr to say "continue from last record" + */ + startlsn = InvalidXLogRecPtr; + + /* + * The {begin_txn,change,commit_txn}_wrapper callbacks above will + * store the description into our tuplestore. + */ + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + /* check limits */ + if (moveto <= ctx->reader->EndRecPtr) + break; + + CHECK_FOR_INTERRUPTS(); + } + + CurrentResourceOwner = old_resowner; + + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + { + LogicalConfirmReceivedLocation(moveto); + + /* + * If only the confirmed_flush_lsn has changed the slot won't get + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. + * + * Dirty the slot so it's written out at the next checkpoint. + * We'll still lose its position on crash, as documented, but it's + * better than always losing the position even on clean restart. + */ + ReplicationSlotMarkDirty(); + } + + retlsn = MyReplicationSlot->data.confirmed_flush; + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return retlsn; +} + +/* + * SQL function for moving the position in a replication slot. + */ +Datum +pg_replication_slot_advance(PG_FUNCTION_ARGS) +{ + Name slotname = PG_GETARG_NAME(0); + XLogRecPtr moveto = PG_GETARG_LSN(1); + XLogRecPtr endlsn; + XLogRecPtr startlsn; + TupleDesc tupdesc; + Datum values[2]; + bool nulls[2]; + HeapTuple tuple; + Datum result; + + Assert(!MyReplicationSlot); + + check_permissions(); + + if (XLogRecPtrIsInvalid(moveto)) + ereport(ERROR, + (errmsg("invalid target wal lsn"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* + * We can't move slot past what's been flushed/replayed so clamp the + * target possition accordingly. + */ + if (!RecoveryInProgress()) + moveto = Min(moveto, GetFlushRecPtr()); + else + moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); + + /* Acquire the slot so we "own" it */ + ReplicationSlotAcquire(NameStr(*slotname), true); + + startlsn = MyReplicationSlot->data.confirmed_flush; + if (moveto < startlsn) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move slot to %X/%X, minimum is %X/%X", + (uint32) (moveto >> 32), (uint32) moveto, + (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), + (uint32) (MyReplicationSlot->data.confirmed_flush)))); + } + + if (OidIsValid(MyReplicationSlot->data.database)) + endlsn = pg_logical_replication_slot_advance(startlsn, moveto); + else + endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[0] = false; + + /* Update the on disk state when lsn was updated. */ + if (XLogRecPtrIsInvalid(endlsn)) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + ReplicationSlotSave(); + } + + ReplicationSlotRelease(); + + /* Return the reached position. */ + values[1] = LSNGetDatum(endlsn); + nulls[1] = false; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8bef3fbdaf..130ecd5559 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1075,6 +1075,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) * to be shipped from that position. */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, + false, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 298e0ae2f0..f01648c961 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5357,6 +5357,8 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3878 ( pg_replication_slot_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 3220" "{19,3220,19,3220}" "{i,i,o,o}" "{slot_name,upto_lsn,slot_name,end_lsn}" _null_ _null_ pg_replication_slot_advance _null_ _null_ _null_ )); +DESCR("advance logical replication slot"); DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ )); DESCR("emit a textual logical decoding message"); DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ )); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index d9059e1cca..619c5f4d73 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -45,6 +45,13 @@ typedef struct LogicalDecodingContext struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; + /* + * Marks the logical decoding context as fast forward decoding one. + * Such a context does not have plugin loaded so most of the the following + * properties are unused. + */ + bool fast_forward; + OutputPluginCallbacks callbacks; OutputPluginOptions options; @@ -97,6 +104,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, -- 2.14.1