I wrote:

> It looks good to me now.

After lying for some time in my head it reminded me that
CreateInitDecodingContext not only pegs the LSN, but also xmin, so
attached makes a minor comment correction.

While taking a look at the nearby code it seemed weird to me that
GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't
want to investigate this at the moment though, and not for this thread.

Also not for this thread, but I've noticed
pg_copy_logical_replication_slot doesn't allow to change plugin name
which is an omission in my view. It would be useful and trivial to do.


-- cheers, arseny

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..da634bef0e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-								bool temporary, XLogRecPtr restart_lsn)
+								bool temporary, XLogRecPtr restart_lsn,
+								bool find_startpoint)
 {
 	LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
-	 * Create logical decoding context, to build the initial snapshot.
+	 * Create logical decoding context to find start point or, if we don't
+	 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
 	 */
 	ctx = CreateInitDecodingContext(plugin, NIL,
-									false,	/* do not build snapshot */
+									false,	/* do not build data snapshot */
 									restart_lsn,
 									logical_read_local_xlog_page, NULL, NULL,
 									NULL);
 
 	/* build initial snapshot, might take a while */
-	DecodingContextFindStartpoint(ctx);
+	if (find_startpoint)
+		DecodingContextFindStartpoint(ctx);
 
 	/* don't need the decoding context anymore */
 	FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	create_logical_replication_slot(NameStr(*name),
 									NameStr(*plugin),
 									temporary,
-									InvalidXLogRecPtr);
+									InvalidXLogRecPtr,
+									true);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
 	/* Create new slot and acquire it */
 	if (logical_slot)
+	{
+		/*
+		 * WAL required for building snapshot could be removed as we haven't
+		 * reserved WAL yet. So we create a new logical replication slot
+		 * without building an initial snapshot.  A reasonable start point for
+		 * decoding will be provided by the source slot.
+		 */
 		create_logical_replication_slot(NameStr(*dst_name),
 										plugin,
 										temporary,
-										src_restart_lsn);
+										src_restart_lsn,
+										false);
+	}
 	else
 		create_physical_replication_slot(NameStr(*dst_name),
 										 true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		TransactionId copy_xmin;
 		TransactionId copy_catalog_xmin;
 		XLogRecPtr	copy_restart_lsn;
+		XLogRecPtr	copy_confirmed_flush;
 		bool		copy_islogical;
 		char	   *copy_name;
 
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		copy_xmin = src->data.xmin;
 		copy_catalog_xmin = src->data.catalog_xmin;
 		copy_restart_lsn = src->data.restart_lsn;
+		copy_confirmed_flush = src->data.confirmed_flush;
 
 		/* for existence check */
 		copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 							NameStr(*src_name)),
 					 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+		/* The source slot must have a consistent snapshot */
+		if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+					 errhint("Retry when the source replication slot creation is finished.")));
+
 		/* Install copied values again */
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		MyReplicationSlot->data.xmin = copy_xmin;
 		MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
 		MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+		MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
 		ReplicationSlotMarkDirty();

Reply via email to