From 9434805e5d357b48c43d0d91d33a0a58e42d13ce Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 26 Jun 2018 21:51:50 +0900
Subject: [PATCH v2] Copy logical replication slot.

---
 doc/src/sgml/func.sgml                      |  21 ++++
 src/backend/replication/logical/logical.c   |  13 ++-
 src/backend/replication/slotfuncs.c         | 152 +++++++++++++++++++++++-----
 src/backend/replication/walsender.c         |   1 +
 src/include/catalog/pg_proc.dat             |  21 ++++
 src/include/replication/logical.h           |   1 +
 src/test/recovery/t/006_logical_decoding.pl |  13 ++-
 7 files changed, 191 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 5dce8ef..253f0e3 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19252,6 +19252,27 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
       <row>
        <entry>
         <indexterm>
+         <primary>pg_copy_logical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copy a existing <parameter>src_slot_name</parameter> logical (decoding) slot
+        to <parameter>dst_slot_name</parameter> while changing the output plugin
+        and persistence. Copied logical slot starts from the same LSN as the
+        source logical slot. Both <parameter>plugin</parameter> and
+        <parameter>temporary</parameter> are optional. If <parameter>plugin</parameter>
+        or <parameter>temporary</parameter> are omitted, the same values of
+        the source logical slot are set.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
          <primary>pg_logical_slot_get_changes</primary>
         </indexterm>
         <literal><function>pg_logical_slot_get_changes(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61588d6..7e7d54a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -223,6 +223,7 @@ LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr start_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
@@ -266,7 +267,15 @@ CreateInitDecodingContext(char *plugin,
 	StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
 	SpinLockRelease(&slot->mutex);
 
-	ReplicationSlotReserveWal();
+	/* Find start location to read WAL if not specified */
+	if (XLogRecPtrIsInvalid(start_lsn))
+		ReplicationSlotReserveWal();
+	else
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = start_lsn;
+		SpinLockRelease(&slot->mutex);
+	}
 
 	/* ----
 	 * This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -311,7 +320,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, start_lsn, xmin_horizon,
 								 need_full_snapshot, true,
 								 read_page, prepare_write, do_write,
 								 update_progress);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2806e10..6872588 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -92,30 +92,19 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(result);
 }
 
-
 /*
- * SQL function for creating a new logical replication slot.
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Return a confirmed_lsn of new replication slot.
  */
-Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+static XLogRecPtr
+create_logical_replication_slot(char *name, char *plugin,
+								bool temporary, XLogRecPtr start_lsn)
 {
-	Name		name = PG_GETARG_NAME(0);
-	Name		plugin = PG_GETARG_NAME(1);
-	bool		temporary = PG_GETARG_BOOL(2);
-
 	LogicalDecodingContext *ctx = NULL;
-
-	TupleDesc	tupdesc;
-	HeapTuple	tuple;
-	Datum		result;
-	Datum		values[2];
-	bool		nulls[2];
+	XLogRecPtr	result;
 
 	Assert(!MyReplicationSlot);
 
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
 	check_permissions();
 
 	CheckLogicalDecodingRequirements();
@@ -128,39 +117,146 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * slots can be created as temporary from beginning as they get dropped on
 	 * error as well.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true,
+	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
 	 */
-	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+	ctx = CreateInitDecodingContext(plugin, NIL,
 									false,	/* do not build snapshot */
+									start_lsn,
 									logical_read_local_xlog_page, NULL, NULL,
 									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
 
-	values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
-	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
 	/* don't need the decoding context anymore */
 	FreeDecodingContext(ctx);
 
-	memset(nulls, 0, sizeof(nulls));
-
-	tuple = heap_form_tuple(tupdesc, values, nulls);
-	result = HeapTupleGetDatum(tuple);
-
 	/* ok, slot is now fully created, mark it as persistent if needed */
 	if (!temporary)
 		ReplicationSlotPersist();
+
+	result = MyReplicationSlot->data.confirmed_flush;
+
 	ReplicationSlotRelease();
 
-	PG_RETURN_DATUM(result);
+	return result;
 }
 
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		name = PG_GETARG_NAME(0);
+	Name		plugin = PG_GETARG_NAME(1);
+	bool		temporary = PG_GETARG_BOOL(2);
+	XLogRecPtr	confirmed_flush;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	confirmed_flush = create_logical_replication_slot(NameStr(*name),
+													  NameStr(*plugin),
+													  temporary,
+													  InvalidXLogRecPtr);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = CStringGetTextDatum(NameStr(*name));
+	values[1] = LSNGetDatum(confirmed_flush);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_POINTER(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * Copy logical replication slot (2 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS)
+{
+	return pg_copy_logical_replication_slot(fcinfo);
+}
+
+/*
+ * Copy logical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS)
+{
+	return pg_copy_logical_replication_slot(fcinfo);
+}
+
+/*
+ * SQL function for copying a logical replication slot.
+ */
+Datum
+pg_copy_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		src_name = PG_GETARG_NAME(0);
+	Name		dst_name = PG_GETARG_NAME(1);
+	char		*plugin;	/* optional argment */
+	bool		temporary;	/* optional argment */
+	XLogRecPtr	confirmed_flush;
+	XLogRecPtr	start_lsn;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* Acquire the source slot so we own it */
+	ReplicationSlotAcquire(NameStr(*src_name), true);
+
+	/* Save some fields before releasing */
+	start_lsn = MyReplicationSlot->data.restart_lsn;
+	plugin = pstrdup(NameStr(MyReplicationSlot->data.plugin));
+	temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY);
+
+	/* Release it */
+	ReplicationSlotRelease();
+
+	/* Check the optional arguments */
+	if (PG_NARGS() >= 3)
+		plugin = NameStr(*(PG_GETARG_NAME(2)));
+	if (PG_NARGS() >= 4)
+		temporary = PG_GETARG_BOOL(3);
+
+	confirmed_flush = create_logical_replication_slot(NameStr(*dst_name),
+													  plugin,
+													  temporary,
+													  start_lsn);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = CStringGetTextDatum(NameStr(*dst_name));
+	values[1] = LSNGetDatum(confirmed_flush);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_POINTER(HeapTupleGetDatum(tuple));
+}
 
 /*
  * SQL function for dropping a replication slot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca..c49f2d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -918,6 +918,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		}
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+										InvalidXLogRecPtr,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 40d54ed..862b89d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9807,6 +9807,27 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4005', descr => 'copy up a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name name bool',
+  proallargtypes => '{name,name,name,bool,text,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,plugin,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot' },
+{ oid => '4006', descr => 'copy up a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name name',
+  proallargtypes => '{name,name,name,text,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,plugin,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin' },
+{ oid => '4007', descr => 'copy up a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,text,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' },
 { oid => '3782', descr => 'get changes from replication slot',
   proname => 'pg_logical_slot_get_changes', procost => '1000',
   prorows => '1000', provariadic => 'text', proisstrict => 'f',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..d1643e5 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr start_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index e3a5fe9..76e91f5 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 17;
 use Config;
 
 # Initialize master node
@@ -27,6 +27,11 @@ $node_master->safe_psql('postgres',
 	qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]
 );
 
+# Copy logical slot
+$node_master->safe_psql('postgres',
+	qq[SELECT pg_copy_logical_replication_slot('test_slot', 'copy_slot', 'test_decoding', false);]
+);
+
 $node_master->safe_psql('postgres',
 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
 );
@@ -37,6 +42,12 @@ my ($result) = $node_master->safe_psql('postgres',
 is(scalar(my @foobar = split /^/m, $result),
 	12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
 
+# Basic decoding works with copied slot
+$result = $node_master->safe_psql('postgres',
+	qq[SELECT pg_logical_slot_get_changes('copy_slot', NULL, NULL);]);
+is(scalar(@foobar = split /^/m, $result),
+	12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
+
 # If we immediately crash the server we might lose the progress we just made
 # and replay the same changes again. But a clean shutdown should never repeat
 # the same changes when we use the SQL decoding interface.
-- 
2.10.5

