On Fri, Jan 15, 2016 at 12:09 PM, Shulgin, Oleksandr <
oleksandr.shul...@zalando.de> wrote:

> On Fri, Jan 15, 2016 at 11:08 AM, Simon Riggs <si...@2ndquadrant.com>
> wrote:
>
>> On 15 January 2016 at 08:30, Shulgin, Oleksandr <
>> oleksandr.shul...@zalando.de> wrote:
>>
>>
>>> I'd like to propose generic functions (probably in an extension, or in
>>> core if not possible otherwise) to facilitate streaming existing data from
>>> the database *in the same format* that one would get if these would be the
>>> changes decoded by a logical decoding plugin.
>>>
>>> The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
>>> command of the replication protocol to get a consistent snapshot of the
>>> database, then start listening to new changes on the slot.
>>>
>>
>> It sounds like this is already possible.
>>
>
> Totally, that's how it was supposed to be used anyway.  What is missing
> IMO is retrieving the initial snapshot in the same format as that the later
> changes will arrive.
>

POC patch attached.  Findings:

1) Needs an actual slot for all the decode machinery to work (code depends
on MyReplicationSlot being set).
2) Requires a core patch.
3) Currently only supports textual output, adding binary is trivial.


Acquiring a slot means this cannot be run in parallel from multiple
backends.  Any ideas on how to overcome this (except for opening multiple
slots with the same LSN)?
To obtain a consistent snapshot, the client still needs to take care of
preserving and setting transaction snapshot properly.

--
Alex
From f800b8c387eb17f4eb005b38b78585f1f165b0d3 Mon Sep 17 00:00:00 2001
From: Oleksandr Shulgin <oleksandr.shul...@zalando.de>
Date: Fri, 15 Jan 2016 17:30:04 +0100
Subject: [PATCH] POC: pg_logical_slot_stream_relation

---
 src/backend/catalog/system_views.sql            |   9 +
 src/backend/replication/logical/logicalfuncs.c  | 337 +++++++++++++++++++++---
 src/backend/replication/logical/reorderbuffer.c |   6 +-
 src/include/catalog/pg_proc.h                   |   2 +
 src/include/replication/logicalfuncs.h          |   1 +
 src/include/replication/reorderbuffer.h         |   3 +
 6 files changed, 315 insertions(+), 43 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 923fe58..5431b61 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -941,6 +941,15 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_logical_slot_stream_relation(
+    IN slot_name name, IN relnamespace name, IN relname name, IN nochildren bool DEFAULT FALSE,
+    VARIADIC options text[] DEFAULT '{}',
+    OUT data text)
+RETURNS SETOF TEXT
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000
+AS 'pg_logical_slot_stream_relation';
+
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
     OUT slot_name name, OUT xlog_position pg_lsn)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 56e47e4..bc62784 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -21,12 +21,18 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 
+#include "access/htup_details.h"
 #include "access/xlog_internal.h"
 
+#include "executor/spi.h"
+
+#include "catalog/namespace.h"
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
 
+#include "lib/stringinfo.h"
+
 #include "mb/pg_wchar.h"
 
 #include "utils/array.h"
@@ -40,6 +46,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/reorderbuffer.h"
 
 #include "storage/fd.h"
 
@@ -50,6 +57,11 @@ typedef struct DecodingOutputState
 	TupleDesc	tupdesc;
 	bool		binary_output;
 	int64		returned_rows;
+
+	/* for pg_logical_stream_relation */
+	Relation		rel;
+	Portal			cursor;
+	TupleTableSlot *tupslot;
 } DecodingOutputState;
 
 /*
@@ -270,6 +282,53 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	return count;
 }
 
+static List *
+deconstruct_options_array(ArrayType *arr)
+{
+	Size		ndim;
+	List	   *options = NIL;
+
+	ndim = ARR_NDIM(arr);
+	if (ndim > 1)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("array must be one-dimensional")));
+	}
+	else if (array_contains_nulls(arr))
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("array must not contain nulls")));
+	}
+	else if (ndim == 1)
+	{
+		int			nelems;
+		Datum	   *datum_opts;
+		int			i;
+
+		Assert(ARR_ELEMTYPE(arr) == TEXTOID);
+
+		deconstruct_array(arr, TEXTOID, -1, false, 'i',
+						  &datum_opts, NULL, &nelems);
+
+		if (nelems % 2 != 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("array must have even number of elements")));
+
+		for (i = 0; i < nelems; i += 2)
+		{
+			char	   *name = TextDatumGetCString(datum_opts[i]);
+			char	   *opt = TextDatumGetCString(datum_opts[i + 1]);
+
+			options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
+		}
+	}
+
+	return options;
+}
+
 /*
  * Helper function for the various SQL callable logical decoding functions.
  */
@@ -287,7 +346,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	ArrayType  *arr;
-	Size		ndim;
 	List	   *options = NIL;
 	DecodingOutputState *p;
 
@@ -339,44 +397,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
 
-	/* Deconstruct options array */
-	ndim = ARR_NDIM(arr);
-	if (ndim > 1)
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("array must be one-dimensional")));
-	}
-	else if (array_contains_nulls(arr))
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("array must not contain nulls")));
-	}
-	else if (ndim == 1)
-	{
-		int			nelems;
-		Datum	   *datum_opts;
-		int			i;
-
-		Assert(ARR_ELEMTYPE(arr) == TEXTOID);
-
-		deconstruct_array(arr, TEXTOID, -1, false, 'i',
-						  &datum_opts, NULL, &nelems);
-
-		if (nelems % 2 != 0)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("array must have even number of elements")));
-
-		for (i = 0; i < nelems; i += 2)
-		{
-			char	   *name = TextDatumGetCString(datum_opts[i]);
-			char	   *opt = TextDatumGetCString(datum_opts[i + 1]);
-
-			options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
-		}
-	}
+	options = deconstruct_options_array(arr);
 
 	p->tupstore = tuplestore_begin_heap(true, false, work_mem);
 	rsinfo->returnMode = SFRM_Materialize;
@@ -515,3 +536,241 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 {
 	return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 }
+
+Datum
+pg_logical_slot_stream_relation(PG_FUNCTION_ARGS)
+{
+	Name		name;
+	Name		relnamespace;
+	Name		relname;
+	bool		nochildren;
+	ArrayType  *arr;
+	List	   *options = NIL;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+	LogicalDecodingContext	   *ctx;
+	DecodingOutputState		   *p;
+	ReorderBufferTXN		   *txn;
+	ReorderBufferChange		   *change;
+
+	FuncCallContext	   *funcctx;
+	MemoryContext		oldcontext;
+	const char		   *relident;
+	int					ret;
+	SPIPlanPtr			plan;
+	StringInfoData		query;
+	Oid					nspoid;
+
+	HeapTuple			tuple;
+	bool				isnull;
+	Datum				result;
+
+	oldcontext = CurrentMemoryContext;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		check_permissions();
+
+		CheckLogicalDecodingRequirements();
+
+		if (PG_ARGISNULL(0))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("slot name must not be null")));
+		name = PG_GETARG_NAME(0);
+
+		if (PG_ARGISNULL(1))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("relnamespace cannot be null")));
+		relnamespace = PG_GETARG_NAME(1);
+
+		if (PG_ARGISNULL(2))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("relname cannot be null")));
+		relname = PG_GETARG_NAME(2);
+
+		if (PG_ARGISNULL(3))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("nochildren must not be null")));
+		nochildren = PG_GETARG_BOOL(3);
+
+		if (PG_ARGISNULL(4))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("options array must not be null")));
+		arr = PG_GETARG_ARRAYTYPE_P(4);
+
+		/* check to see if caller supports us returning a tuplestore */
+		if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("set-valued function called in context that cannot accept a set")));
+		/*
+		if (!(rsinfo->allowedModes & SFRM_Materialize))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("materialize mode required, but it is not allowed in this context")));
+		*/
+
+		MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+		options = deconstruct_options_array(arr);
+
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* Things allocated in this memory context will live until SRF_RETURN_DONE(). */
+		MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		ReplicationSlotAcquire(NameStr(*name));
+
+		ctx = CreateDecodingContext(InvalidXLogRecPtr,
+									options,
+									NULL /*logical_read_local_xlog_page*/,
+									LogicalOutputPrepareWrite,
+									LogicalOutputWrite);
+		funcctx->user_fctx = ctx;
+
+		/*
+		 * Check whether the output plugin writes textual output if that's
+		 * what we need.
+		 */
+		if (/*!binary &&*/ ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
+							NameStr(MyReplicationSlot->data.plugin),
+							format_procedure(fcinfo->flinfo->fn_oid))));
+
+		p = palloc0(sizeof(DecodingOutputState));
+		p->binary_output = false /*binary*/;
+
+		/* Build a tuple descriptor for our result type */
+		if (get_func_result_type(3784 /* pg_logical_slot_peek_changes */,
+								 NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
+			elog(ERROR, "return type must be a row type");
+
+		p->tupstore = tuplestore_begin_heap(true, false, work_mem);
+
+		/* and we need a slot to fetch the tuples back */
+		p->tupslot = MakeSingleTupleTableSlot(p->tupdesc);
+
+		ctx->output_writer_private = p;
+
+		/* Build a simple "SELECT ... FROM [ONLY] schema.table" query. */
+		initStringInfo(&query);
+		appendStringInfoString(&query, "SELECT * FROM ");
+
+		/* Exclude data from children tables? */
+		if (nochildren)
+			appendStringInfoString(&query, " ONLY");
+
+		relident = quote_qualified_identifier(NameStr(*relnamespace), NameStr(*relname));
+		appendStringInfoString(&query, relident);
+
+		nspoid = get_namespace_oid(NameStr(*relnamespace), false);
+		p->rel = RelationIdGetRelation(get_relname_relid(NameStr(*relname), nspoid));
+
+		/* Initialize SPI (this switches to its own memory context). */
+		if ((ret = SPI_connect()) < 0)
+			elog(ERROR, "SPI_connect returned %d", ret);
+
+		plan = SPI_prepare_cursor(query.data, 0, NULL, CURSOR_OPT_NO_SCROLL);
+		if (!plan)
+			elog(ERROR, "SPI_prepare_cursor failed with error %d", SPI_result);
+
+		p->cursor = SPI_cursor_open(NULL, plan, NULL, NULL, true);
+
+		/* XXX: emit BEGIN? */
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+
+	MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+	ctx = (LogicalDecodingContext *) funcctx->user_fctx;
+	p = (DecodingOutputState *) ctx->output_writer_private;
+
+	/*
+	 * A single call to logical decoding plugin callback might emit a number
+	 * of writes, so if we have some stuff in tupstore from previous call we
+	 * give it away now.
+	 */
+	if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+								p->tupslot))
+	{
+		tuple = ExecCopySlotTuple(p->tupslot);
+		result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+
+		/* XXX: Assert(!isnull) ? */
+		if (isnull)
+			SRF_RETURN_NEXT_NULL(funcctx);
+		else
+			SRF_RETURN_NEXT(funcctx, result);
+	}
+	tuplestore_clear(p->tupstore);
+
+	SPI_cursor_fetch(p->cursor, true, 1);
+	if (SPI_processed == 0)
+	{
+		/* We're done, release the slot and other resources. */
+		/* XXX: emit COMMIT? */
+		RelationClose(p->rel);
+		ExecDropSingleTupleTableSlot(p->tupslot);
+
+		SPI_cursor_close(p->cursor);
+		SPI_freetuptable(SPI_tuptable);
+		SPI_finish();
+
+		FreeDecodingContext(ctx);
+		ReplicationSlotRelease();
+
+		MemoryContextSwitchTo(oldcontext);
+
+		SRF_RETURN_DONE(funcctx);
+	}
+	if (SPI_processed != 1)
+		elog(ERROR, "expected exactly 1 row from cursor, but got %d rows", SPI_processed);
+
+	/* SPI_cursor_fetch() leaves us in the SPI memory context, switch back: */
+	/* XXX: do we need our own context here? */
+	MemoryContextSwitchTo(ctx->context);
+
+	/* emit INSERT */
+	txn = ReorderBufferGetTXN(ctx->reorder);
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_INSERT;
+
+	change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+	memset(change->data.tp.newtuple, 0, sizeof(ReorderBufferTupleBuf));
+	memcpy(&change->data.tp.newtuple->tuple, SPI_tuptable->vals[0], sizeof(HeapTupleData));
+
+	ctx->reorder->apply_change(ctx->reorder, txn, p->rel, change);
+
+	ReorderBufferReturnChange(ctx->reorder, change);
+	ReorderBufferReturnTXN(ctx->reorder, txn);
+
+	MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+	/* fetch a tuple from the store */
+	if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+								p->tupslot))
+	{
+		tuple = ExecCopySlotTuple(p->tupslot);
+		result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+	}
+
+	/* don't forget to clear the SPI temp context */
+	SPI_freetuptable(SPI_tuptable);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* XXX: Assert(!isnull) ? */
+	if (isnull)
+		SRF_RETURN_NEXT_NULL(funcctx);
+	else
+		SRF_RETURN_NEXT(funcctx, result);
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 78acced..df1fa12 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,8 +165,6 @@ static const Size max_cached_transactions = 512;
  * primary reorderbuffer support routines
  * ---------------------------------------
  */
-static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
-static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 					  TransactionId xid, bool create, bool *is_new,
 					  XLogRecPtr lsn, bool create_as_top);
@@ -288,7 +286,7 @@ ReorderBufferFree(ReorderBuffer *rb)
 /*
  * Get an unused, possibly preallocated, ReorderBufferTXN.
  */
-static ReorderBufferTXN *
+ReorderBufferTXN *
 ReorderBufferGetTXN(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
@@ -322,7 +320,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
  * Deallocation might be delayed for efficiency purposes, for details check
  * the comments above max_cached_changes's definition.
  */
-static void
+void
 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	/* clean the lookup cache if we were cached (quite likely) */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index f58672e..a861153 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5221,6 +5221,8 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3997 (  pg_logical_slot_stream_relation PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 5 0 25 "19 19 19 16 1009" "{19,19,19,16,1009,25}" "{i,i,i,i,v,o}" "{slot_name,relnamespace,relname,nochildren,options,data}" _null_ _null_ pg_logical_slot_stream_relation _null_ _null_ _null_ ));
+DESCR("stream relation as a series of changes using the replication slot plugin");
 
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects		PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1df..df60bfe 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -20,5 +20,6 @@ extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_stream_relation(PG_FUNCTION_ARGS);
 
 #endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2abee0a..e321e43 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -341,6 +341,9 @@ struct ReorderBuffer
 ReorderBuffer *ReorderBufferAllocate(void);
 void		ReorderBufferFree(ReorderBuffer *);
 
+ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
+void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
-- 
2.5.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to