On Fri, Jan 15, 2016 at 12:09 PM, Shulgin, Oleksandr < oleksandr.shul...@zalando.de> wrote:
> On Fri, Jan 15, 2016 at 11:08 AM, Simon Riggs <si...@2ndquadrant.com> > wrote: > >> On 15 January 2016 at 08:30, Shulgin, Oleksandr < >> oleksandr.shul...@zalando.de> wrote: >> >> >>> I'd like to propose generic functions (probably in an extension, or in >>> core if not possible otherwise) to facilitate streaming existing data from >>> the database *in the same format* that one would get if these would be the >>> changes decoded by a logical decoding plugin. >>> >>> The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT >>> command of the replication protocol to get a consistent snapshot of the >>> database, then start listening to new changes on the slot. >>> >> >> It sounds like this is already possible. >> > > Totally, that's how it was supposed to be used anyway. What is missing > IMO is retrieving the initial snapshot in the same format as that the later > changes will arrive. > POC patch attached. Findings: 1) Needs an actual slot for all the decode machinery to work (code depends on MyReplicationSlot being set). 2) Requires a core patch. 3) Currently only supports textual output, adding binary is trivial. Acquiring a slot means this cannot be run in parallel from multiple backends. Any ideas on how to overcome this (except for opening multiple slots with the same LSN)? To obtain a consistent snapshot, the client still needs to take care of preserving and setting transaction snapshot properly. -- Alex
From f800b8c387eb17f4eb005b38b78585f1f165b0d3 Mon Sep 17 00:00:00 2001 From: Oleksandr Shulgin <oleksandr.shul...@zalando.de> Date: Fri, 15 Jan 2016 17:30:04 +0100 Subject: [PATCH] POC: pg_logical_slot_stream_relation --- src/backend/catalog/system_views.sql | 9 + src/backend/replication/logical/logicalfuncs.c | 337 +++++++++++++++++++++--- src/backend/replication/logical/reorderbuffer.c | 6 +- src/include/catalog/pg_proc.h | 2 + src/include/replication/logicalfuncs.h | 1 + src/include/replication/reorderbuffer.h | 3 + 6 files changed, 315 insertions(+), 43 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 923fe58..5431b61 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -941,6 +941,15 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_logical_slot_stream_relation( + IN slot_name name, IN relnamespace name, IN relname name, IN nochildren bool DEFAULT FALSE, + VARIADIC options text[] DEFAULT '{}', + OUT data text) +RETURNS SETOF TEXT +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 +AS 'pg_logical_slot_stream_relation'; + CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, OUT slot_name name, OUT xlog_position pg_lsn) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 56e47e4..bc62784 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -21,12 +21,18 @@ #include "funcapi.h" #include "miscadmin.h" +#include "access/htup_details.h" #include "access/xlog_internal.h" +#include "executor/spi.h" + +#include "catalog/namespace.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" +#include "lib/stringinfo.h" + #include "mb/pg_wchar.h" #include "utils/array.h" @@ -40,6 +46,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" +#include "replication/reorderbuffer.h" #include "storage/fd.h" @@ -50,6 +57,11 @@ typedef struct DecodingOutputState TupleDesc tupdesc; bool binary_output; int64 returned_rows; + + /* for pg_logical_stream_relation */ + Relation rel; + Portal cursor; + TupleTableSlot *tupslot; } DecodingOutputState; /* @@ -270,6 +282,53 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, return count; } +static List * +deconstruct_options_array(ArrayType *arr) +{ + Size ndim; + List *options = NIL; + + ndim = ARR_NDIM(arr); + if (ndim > 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("array must be one-dimensional"))); + } + else if (array_contains_nulls(arr)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("array must not contain nulls"))); + } + else if (ndim == 1) + { + int nelems; + Datum *datum_opts; + int i; + + Assert(ARR_ELEMTYPE(arr) == TEXTOID); + + deconstruct_array(arr, TEXTOID, -1, false, 'i', + &datum_opts, NULL, &nelems); + + if (nelems % 2 != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("array must have even number of elements"))); + + for (i = 0; i < nelems; i += 2) + { + char *name = TextDatumGetCString(datum_opts[i]); + char *opt = TextDatumGetCString(datum_opts[i + 1]); + + options = lappend(options, makeDefElem(name, (Node *) makeString(opt))); + } + } + + return options; +} + /* * Helper function for the various SQL callable logical decoding functions. */ @@ -287,7 +346,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; - Size ndim; List *options = NIL; DecodingOutputState *p; @@ -339,44 +397,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); - /* Deconstruct options array */ - ndim = ARR_NDIM(arr); - if (ndim > 1) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array must be one-dimensional"))); - } - else if (array_contains_nulls(arr)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array must not contain nulls"))); - } - else if (ndim == 1) - { - int nelems; - Datum *datum_opts; - int i; - - Assert(ARR_ELEMTYPE(arr) == TEXTOID); - - deconstruct_array(arr, TEXTOID, -1, false, 'i', - &datum_opts, NULL, &nelems); - - if (nelems % 2 != 0) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array must have even number of elements"))); - - for (i = 0; i < nelems; i += 2) - { - char *name = TextDatumGetCString(datum_opts[i]); - char *opt = TextDatumGetCString(datum_opts[i + 1]); - - options = lappend(options, makeDefElem(name, (Node *) makeString(opt))); - } - } + options = deconstruct_options_array(arr); p->tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; @@ -515,3 +536,241 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { return pg_logical_slot_get_changes_guts(fcinfo, false, true); } + +Datum +pg_logical_slot_stream_relation(PG_FUNCTION_ARGS) +{ + Name name; + Name relnamespace; + Name relname; + bool nochildren; + ArrayType *arr; + List *options = NIL; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + LogicalDecodingContext *ctx; + DecodingOutputState *p; + ReorderBufferTXN *txn; + ReorderBufferChange *change; + + FuncCallContext *funcctx; + MemoryContext oldcontext; + const char *relident; + int ret; + SPIPlanPtr plan; + StringInfoData query; + Oid nspoid; + + HeapTuple tuple; + bool isnull; + Datum result; + + oldcontext = CurrentMemoryContext; + + if (SRF_IS_FIRSTCALL()) + { + check_permissions(); + + CheckLogicalDecodingRequirements(); + + if (PG_ARGISNULL(0)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("slot name must not be null"))); + name = PG_GETARG_NAME(0); + + if (PG_ARGISNULL(1)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("relnamespace cannot be null"))); + relnamespace = PG_GETARG_NAME(1); + + if (PG_ARGISNULL(2)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("relname cannot be null"))); + relname = PG_GETARG_NAME(2); + + if (PG_ARGISNULL(3)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("nochildren must not be null"))); + nochildren = PG_GETARG_BOOL(3); + + if (PG_ARGISNULL(4)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("options array must not be null"))); + arr = PG_GETARG_ARRAYTYPE_P(4); + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + /* + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + */ + + MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + + options = deconstruct_options_array(arr); + + funcctx = SRF_FIRSTCALL_INIT(); + + /* Things allocated in this memory context will live until SRF_RETURN_DONE(). */ + MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + ReplicationSlotAcquire(NameStr(*name)); + + ctx = CreateDecodingContext(InvalidXLogRecPtr, + options, + NULL /*logical_read_local_xlog_page*/, + LogicalOutputPrepareWrite, + LogicalOutputWrite); + funcctx->user_fctx = ctx; + + /* + * Check whether the output plugin writes textual output if that's + * what we need. + */ + if (/*!binary &&*/ ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data", + NameStr(MyReplicationSlot->data.plugin), + format_procedure(fcinfo->flinfo->fn_oid)))); + + p = palloc0(sizeof(DecodingOutputState)); + p->binary_output = false /*binary*/; + + /* Build a tuple descriptor for our result type */ + if (get_func_result_type(3784 /* pg_logical_slot_peek_changes */, + NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + p->tupstore = tuplestore_begin_heap(true, false, work_mem); + + /* and we need a slot to fetch the tuples back */ + p->tupslot = MakeSingleTupleTableSlot(p->tupdesc); + + ctx->output_writer_private = p; + + /* Build a simple "SELECT ... FROM [ONLY] schema.table" query. */ + initStringInfo(&query); + appendStringInfoString(&query, "SELECT * FROM "); + + /* Exclude data from children tables? */ + if (nochildren) + appendStringInfoString(&query, " ONLY"); + + relident = quote_qualified_identifier(NameStr(*relnamespace), NameStr(*relname)); + appendStringInfoString(&query, relident); + + nspoid = get_namespace_oid(NameStr(*relnamespace), false); + p->rel = RelationIdGetRelation(get_relname_relid(NameStr(*relname), nspoid)); + + /* Initialize SPI (this switches to its own memory context). */ + if ((ret = SPI_connect()) < 0) + elog(ERROR, "SPI_connect returned %d", ret); + + plan = SPI_prepare_cursor(query.data, 0, NULL, CURSOR_OPT_NO_SCROLL); + if (!plan) + elog(ERROR, "SPI_prepare_cursor failed with error %d", SPI_result); + + p->cursor = SPI_cursor_open(NULL, plan, NULL, NULL, true); + + /* XXX: emit BEGIN? */ + } + + funcctx = SRF_PERCALL_SETUP(); + + MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + + ctx = (LogicalDecodingContext *) funcctx->user_fctx; + p = (DecodingOutputState *) ctx->output_writer_private; + + /* + * A single call to logical decoding plugin callback might emit a number + * of writes, so if we have some stuff in tupstore from previous call we + * give it away now. + */ + if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false, + p->tupslot)) + { + tuple = ExecCopySlotTuple(p->tupslot); + result = fastgetattr(tuple, 3, p->tupdesc, &isnull); + + /* XXX: Assert(!isnull) ? */ + if (isnull) + SRF_RETURN_NEXT_NULL(funcctx); + else + SRF_RETURN_NEXT(funcctx, result); + } + tuplestore_clear(p->tupstore); + + SPI_cursor_fetch(p->cursor, true, 1); + if (SPI_processed == 0) + { + /* We're done, release the slot and other resources. */ + /* XXX: emit COMMIT? */ + RelationClose(p->rel); + ExecDropSingleTupleTableSlot(p->tupslot); + + SPI_cursor_close(p->cursor); + SPI_freetuptable(SPI_tuptable); + SPI_finish(); + + FreeDecodingContext(ctx); + ReplicationSlotRelease(); + + MemoryContextSwitchTo(oldcontext); + + SRF_RETURN_DONE(funcctx); + } + if (SPI_processed != 1) + elog(ERROR, "expected exactly 1 row from cursor, but got %d rows", SPI_processed); + + /* SPI_cursor_fetch() leaves us in the SPI memory context, switch back: */ + /* XXX: do we need our own context here? */ + MemoryContextSwitchTo(ctx->context); + + /* emit INSERT */ + txn = ReorderBufferGetTXN(ctx->reorder); + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_INSERT; + + change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + memset(change->data.tp.newtuple, 0, sizeof(ReorderBufferTupleBuf)); + memcpy(&change->data.tp.newtuple->tuple, SPI_tuptable->vals[0], sizeof(HeapTupleData)); + + ctx->reorder->apply_change(ctx->reorder, txn, p->rel, change); + + ReorderBufferReturnChange(ctx->reorder, change); + ReorderBufferReturnTXN(ctx->reorder, txn); + + MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + + /* fetch a tuple from the store */ + if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false, + p->tupslot)) + { + tuple = ExecCopySlotTuple(p->tupslot); + result = fastgetattr(tuple, 3, p->tupdesc, &isnull); + } + + /* don't forget to clear the SPI temp context */ + SPI_freetuptable(SPI_tuptable); + + MemoryContextSwitchTo(oldcontext); + + /* XXX: Assert(!isnull) ? */ + if (isnull) + SRF_RETURN_NEXT_NULL(funcctx); + else + SRF_RETURN_NEXT(funcctx, result); +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 78acced..df1fa12 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -165,8 +165,6 @@ static const Size max_cached_transactions = 512; * primary reorderbuffer support routines * --------------------------------------- */ -static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); -static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top); @@ -288,7 +286,7 @@ ReorderBufferFree(ReorderBuffer *rb) /* * Get an unused, possibly preallocated, ReorderBufferTXN. */ -static ReorderBufferTXN * +ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb) { ReorderBufferTXN *txn; @@ -322,7 +320,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb) * Deallocation might be delayed for efficiency purposes, for details check * the comments above max_cached_changes's definition. */ -static void +void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* clean the lookup cache if we were cached (quite likely) */ diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index f58672e..a861153 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5221,6 +5221,8 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3997 ( pg_logical_slot_stream_relation PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 5 0 25 "19 19 19 16 1009" "{19,19,19,16,1009,25}" "{i,i,i,i,v,o}" "{slot_name,relnamespace,relname,nochildren,options,data}" _null_ _null_ pg_logical_slot_stream_relation _null_ _null_ _null_ )); +DESCR("stream relation as a series of changes using the replication slot plugin"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index c87a1df..df60bfe 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -20,5 +20,6 @@ extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_stream_relation(PG_FUNCTION_ARGS); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 2abee0a..e321e43 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -341,6 +341,9 @@ struct ReorderBuffer ReorderBuffer *ReorderBufferAllocate(void); void ReorderBufferFree(ReorderBuffer *); +ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); +void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); + ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); -- 2.5.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers