From ed0512f321bc98dc16a2bc2cc84a919325060685 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 21 Apr 2023 17:14:04 +0900
Subject: [PATCH] Poc: initial table structure synchronization in logical
 replication.

The key idea is to postpone creation of pg_subscription_rel entry to
the point where the tablesync worker synchronizes and creates the
table on the subscriber.

It is a PoC patch so mixed the following changes:

- Add oid column to the pg_subscription_rel.
  - use it as the primary key.
  - use it in the names of origin and slot the tablesync workers use.
- Add copy_schema = on/off option to CREATE SUBSCRIPTION.
  - not yet support for ALTER SUBSCRIPTION.
- Add CRS_EXPORT_USE_SNAPSHOT new action.
  - to use the same snapshot by walsender AND other processes (e.g. pg_dump).
  - the snapshot is exported for pg_dump and is used for COPY.
---
 src/backend/catalog/heap.c                    |   2 +-
 src/backend/catalog/pg_subscription.c         | 270 +++++++++++++++---
 src/backend/catalog/system_views.sql          |   2 +-
 src/backend/commands/subscriptioncmds.c       | 107 +++++--
 .../libpqwalreceiver/libpqwalreceiver.c       |   6 +
 .../replication/logical/applyparallelworker.c |   2 +-
 src/backend/replication/logical/launcher.c    |  34 +--
 src/backend/replication/logical/relation.c    |   6 +-
 src/backend/replication/logical/snapbuild.c   |  25 +-
 src/backend/replication/logical/tablesync.c   | 246 ++++++++++++----
 src/backend/replication/logical/worker.c      |  18 +-
 src/backend/replication/walsender.c           |  11 +-
 src/backend/utils/cache/syscache.c            |   7 +-
 src/include/catalog/pg_proc.dat               |   2 +-
 src/include/catalog/pg_subscription_rel.h     |  39 ++-
 src/include/replication/snapbuild.h           |   2 +-
 src/include/replication/walsender.h           |   3 +-
 src/include/replication/worker_internal.h     |  10 +-
 src/include/utils/syscache.h                  |   2 +-
 19 files changed, 629 insertions(+), 165 deletions(-)

diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 2a0d82aedd..32414c30cf 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1865,7 +1865,7 @@ heap_drop_with_catalog(Oid relid)
 	/*
 	 * Remove any associated relation synchronization states.
 	 */
-	RemoveSubscriptionRel(InvalidOid, relid);
+	RemoveSubscriptionRel(InvalidOid, relid, InvalidOid);
 
 	/*
 	 * Forget any ON COMMIT action for the rel
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..b52168ce9b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -35,6 +35,7 @@
 #include "utils/syscache.h"
 
 static List *textarray_to_stringlist(ArrayType *textarray);
+static SubscriptionRelState *deconstruct_subrelstate(HeapTuple tup);
 
 /*
  * Fetch the subscription from the syscache.
@@ -227,8 +228,8 @@ textarray_to_stringlist(ArrayType *textarray)
  * Add new state record for a subscription table.
  */
 void
-AddSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn)
+AddSubscriptionRelState(Oid subid, Oid relid, char state, bool syncschema, bool syncdata,
+						XLogRecPtr sublsn, char *nspname, char *relname)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -239,25 +240,40 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
 
-	/* Try finding existing mapping. */
-	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
-							  ObjectIdGetDatum(relid),
-							  ObjectIdGetDatum(subid));
-	if (HeapTupleIsValid(tup))
-		elog(ERROR, "subscription table %u in subscription %u already exists",
-			 relid, subid);
+	/* XXX: existence check */
 
 	/* Form the tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_oid - 1] =
+		GetNewOidWithIndex(rel, SubscriptionRelObjectIdIndexId,
+						   Anum_pg_subscription_rel_oid);
 	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+
+	if (OidIsValid(relid))
+		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	else
+		nulls[Anum_pg_subscription_rel_srrelid - 1] = true;
+
 	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
 	if (sublsn != InvalidXLogRecPtr)
 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
 	else
 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
+	if (nspname)
+		values[Anum_pg_subscription_rel_srnspname - 1] = CStringGetDatum(nspname);
+	else
+		nulls[Anum_pg_subscription_rel_srnspname - 1] = true;
+
+	if (relname)
+		values[Anum_pg_subscription_rel_srrelname - 1] = CStringGetDatum(relname);
+	else
+		nulls[Anum_pg_subscription_rel_srrelname - 1] = true;
+
+	values[Anum_pg_subscription_rel_srsyncschema - 1] = BoolGetDatum(syncschema);
+	values[Anum_pg_subscription_rel_srsyncdata - 1] = BoolGetDatum(syncdata);
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -269,11 +285,49 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/* Update srrelid to the given relid */
+void
+UpdateSubscriptionRelRelid(Oid subid, Oid subrelid, Oid relid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
+
+	/* XXX: need to distinguish from message in UpdateSubscriptionRelState() */
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 subrelid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srrelid - 1] = true;
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	table_close(rel, NoLock);
+}
+
 /*
  * Update the state of a subscription table.
  */
 void
-UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+UpdateSubscriptionRelState(Oid subid, Oid subrelid, char state,
 						   XLogRecPtr sublsn)
 {
 	Relation	rel;
@@ -287,12 +341,10 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	/* Try finding existing mapping. */
-	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
-							  ObjectIdGetDatum(relid),
-							  ObjectIdGetDatum(subid));
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
 	if (!HeapTupleIsValid(tup))
 		elog(ERROR, "subscription table %u in subscription %u does not exist",
-			 relid, subid);
+			 subrelid, subid);
 
 	/* Update the tuple. */
 	memset(values, 0, sizeof(values));
@@ -318,13 +370,76 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * Similar to GetSubscriptionRelState, but search the entry by the relid.
+ *
+ * XXX: Note that we cannot use syscache for searching the pg_subscription_rel entry
+ * by (srsubid, srrelid) because cache key columns should always be not  NULL (see
+ * CatalogCacheInitializeCache() for details).
+ *
+ * XXX: duplicated with GetSubscriptionRelState().
+ */
+char
+GetSubscriptoinRelStateByRelid(Oid subid, Oid relid, XLogRecPtr *sublsn)
+{
+	Relation rel;
+	ScanKeyData skey[2];
+	SysScanDesc scan;
+	HeapTuple tup;
+	char substate;
+	Datum d;
+	bool isnull;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+	ScanKeyInit(&skey[1],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	scan = systable_beginscan(rel, SubscriptionRelSrrelidSrsubidIndexId, true,
+							  NULL, 2, skey);
+
+	tup = systable_getnext(scan);
+
+
+	if (!HeapTupleIsValid(tup))
+	{
+		systable_endscan(scan);
+		table_close(rel, AccessShareLock);
+		*sublsn = InvalidXLogRecPtr;
+		return SUBREL_STATE_UNKNOWN;
+	}
+
+	/* Get the state. */
+	substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
+
+	/* Get the LSN */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srsublsn, &isnull);
+	if (isnull)
+		*sublsn = InvalidXLogRecPtr;
+	else
+		*sublsn = DatumGetLSN(d);
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	return substate;
+}
+
 /*
  * Get state of subscription table.
  *
  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
  */
 char
-GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
+GetSubscriptionRelState(Oid subid, Oid subrelid, XLogRecPtr *sublsn)
 {
 	HeapTuple	tup;
 	char		substate;
@@ -339,9 +454,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
 	/* Try finding the mapping. */
-	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
-						  ObjectIdGetDatum(relid),
-						  ObjectIdGetDatum(subid));
+	tup = SearchSysCache1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
 
 	if (!HeapTupleIsValid(tup))
 	{
@@ -354,7 +467,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
 
 	/* Get the LSN */
-	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
 						Anum_pg_subscription_rel_srsublsn, &isnull);
 	if (isnull)
 		*sublsn = InvalidXLogRecPtr;
@@ -369,16 +482,105 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	return substate;
 }
 
+/* Get palloc'ed SubscriptionRelState of the given subrelid */
+SubscriptionRelState *
+GetSubscriptionRelByOid(Oid subrelid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	SubscriptionRelState *relstate;
+
+	/*
+	 * This is to avoid the race condition with AlterSubscription which tries
+	 * to remove this relstate.
+	 */
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	tup = SearchSysCache1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, AccessShareLock);
+		return NULL;
+	}
+
+	relstate = deconstruct_subrelstate(tup);
+
+	/* Cleanup */
+	ReleaseSysCache(tup);
+	table_close(rel, AccessShareLock);
+
+	return relstate;
+}
+
+/*
+ * Extract subscription relation state information from the heap tuple and return palloc'ed
+ * SubscriptionRelState.
+ */
+static SubscriptionRelState *
+deconstruct_subrelstate(HeapTuple tup)
+{
+	Form_pg_subscription_rel subrel_form;
+	SubscriptionRelState *relstate;
+	Datum d;
+	bool isnull;
+
+	subrel_form = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+	relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+	relstate->oid = subrel_form->oid;
+
+	/* Get the LSN */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srrelid, &isnull);
+	if (isnull)
+		relstate->relid = InvalidOid;
+	else
+		relstate->relid = DatumGetObjectId(d);
+
+	/* Get the LSN */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srsublsn, &isnull);
+	if (isnull)
+		relstate->lsn = InvalidXLogRecPtr;
+	else
+		relstate->lsn = DatumGetLSN(d);
+
+	relstate->state = subrel_form->srsubstate;
+
+	/* srnspname */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srnspname, &isnull);
+	if (isnull)
+		relstate->nspname = NULL;
+	else
+		relstate->nspname = pstrdup(NameStr(*DatumGetName(d)));
+
+	/* srrelname */
+	d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup,
+						Anum_pg_subscription_rel_srrelname, &isnull);
+	if (isnull)
+		relstate->relname = NULL;
+	else
+		relstate->relname = pstrdup(NameStr(*DatumGetName(d)));
+
+	/* syncflags */
+	relstate->syncflags =
+		(((subrel_form->srsyncschema) ? SUBREL_SYNC_KIND_SCHEMA : 0) |
+		 ((subrel_form->srsyncdata) ? SUBREL_SYNC_KIND_DATA : 0));
+
+	return relstate;
+}
 /*
  * Drop subscription relation mapping. These can be for a particular
  * subscription, or for a particular relation, or both.
  */
 void
-RemoveSubscriptionRel(Oid subid, Oid relid)
+RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid)
 {
 	Relation	rel;
 	TableScanDesc scan;
-	ScanKeyData skey[2];
+	ScanKeyData skey[3];
 	HeapTuple	tup;
 	int			nkeys = 0;
 
@@ -402,6 +604,15 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 					ObjectIdGetDatum(relid));
 	}
 
+	if (OidIsValid(subrelid))
+	{
+		ScanKeyInit(&skey[nkeys++],
+					Anum_pg_subscription_rel_oid,
+					BTEqualStrategyNumber,
+					F_OIDEQ,
+					ObjectIdGetDatum(subrelid));
+	}
+
 	/* Do the search and delete what we found. */
 	scan = table_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
@@ -511,22 +722,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 
 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
 	{
-		Form_pg_subscription_rel subrel;
 		SubscriptionRelState *relstate;
-		Datum		d;
-		bool		isnull;
-
-		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
 
-		relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
-							Anum_pg_subscription_rel_srsublsn, &isnull);
-		if (isnull)
-			relstate->lsn = InvalidXLogRecPtr;
-		else
-			relstate->lsn = DatumGetLSN(d);
+		relstate = deconstruct_subrelstate(tup);
 
 		res = lappend(res, relstate);
 	}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 48aacf66ee..de9988e72e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -951,7 +951,7 @@ CREATE VIEW pg_stat_subscription AS
             su.subname,
             st.pid,
             st.leader_pid,
-            st.relid,
+            st.subrelid,
             st.received_lsn,
             st.last_msg_send_time,
             st.last_msg_receipt_time,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 56eafbff10..a08412fb11 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,6 +71,7 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
+#define SUBOPT_COPY_SCHEMA			0x00008000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -88,6 +89,8 @@ typedef struct SubOpts
 	bool		enabled;
 	bool		create_slot;
 	bool		copy_data;
+	/* XXX: want to choose synchronizing only tables or all objects? */
+	bool		copy_schema;
 	bool		refresh;
 	bool		binary;
 	char		streaming;
@@ -141,6 +144,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->create_slot = true;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
 		opts->copy_data = true;
+	if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA))
+		opts->copy_data = true;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
@@ -214,6 +219,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_COPY_DATA;
 			opts->copy_data = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) &&
+				 strcmp(defel->defname, "copy_schema") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_SCHEMA;
+			opts->copy_schema = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
 				 strcmp(defel->defname, "synchronous_commit") == 0)
 		{
@@ -388,10 +402,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
+		if (opts->copy_schema &&
+			IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "copy_schema = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
 		opts->copy_data = false;
+		opts->copy_schema = false;
 	}
 
 	/*
@@ -587,7 +609,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	 * Connection and publication should not be specified here.
 	 */
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
-					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
+					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
@@ -752,7 +774,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			if (opts.copy_data || opts.copy_schema)
+				table_state = SUBREL_STATE_INIT;
+			else
+				table_state = SUBREL_STATE_READY;
 
 			/*
 			 * Get the table list from publisher and build local table status
@@ -762,16 +787,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
-				Oid			relid;
+				Oid			relid = InvalidOid;
+				char 		*nspname, *relname = NULL;
 
-				relid = RangeVarGetRelid(rv, AccessShareLock, false);
+				if (opts.copy_schema)
+				{
+					nspname = rv->schemaname;
+					relname = rv->relname;
+				}
+				else
+				{
+					/* The relation should already be present on the subscriber */
+					relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
+					/* Check for supported relkind. */
+					CheckSubscriptionRelkind(get_rel_relkind(relid),
+											 rv->schemaname, rv->relname);
+				}
 
-				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
+				AddSubscriptionRelState(subid, relid, table_state, opts.copy_schema,
+										opts.copy_data, InvalidXLogRecPtr, nspname,
+										relname);
 			}
 
 			/*
@@ -898,7 +934,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		{
 			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
 
-			subrel_local_oids[off++] = relstate->relid;
+			subrel_local_oids[off++] = relstate->oid;
 		}
 		qsort(subrel_local_oids, subrel_count,
 			  sizeof(Oid), oid_cmp);
@@ -939,9 +975,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			if (!bsearch(&relid, subrel_local_oids,
 						 subrel_count, sizeof(Oid), oid_cmp))
 			{
+				/* XXX: support sync schema */
 				AddSubscriptionRelState(sub->oid, relid,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-										InvalidXLogRecPtr);
+										false, copy_data,
+										InvalidXLogRecPtr, NULL, NULL);
 				ereport(DEBUG1,
 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
 										 rv->schemaname, rv->relname, sub->name)));
@@ -958,13 +996,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		remove_rel_len = 0;
 		for (off = 0; off < subrel_count; off++)
 		{
-			Oid			relid = subrel_local_oids[off];
+			Oid			subrelid = subrel_local_oids[off];
 
-			if (!bsearch(&relid, pubrel_local_oids,
+			if (!bsearch(&subrelid, pubrel_local_oids,
 						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
 			{
+				SubscriptionRelState *relstate;
 				char		state;
-				XLogRecPtr	statelsn;
 
 				/*
 				 * Lock pg_subscription_rel with AccessExclusiveLock to
@@ -984,14 +1022,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
 
 				/* Last known rel state. */
-				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+				relstate = GetSubscriptionRelByOid(subrelid);
 
-				sub_remove_rels[remove_rel_len].relid = relid;
+				state = relstate->state;
+
+				sub_remove_rels[remove_rel_len].relid = subrelid;
 				sub_remove_rels[remove_rel_len++].state = state;
 
-				RemoveSubscriptionRel(sub->oid, relid);
+				RemoveSubscriptionRel(InvalidOid, InvalidOid, subrelid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				logicalrep_worker_stop(sub->oid, subrelid);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -1011,16 +1051,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					 * origin and by this time the origin might be already
 					 * removed. For these reasons, passing missing_ok = true.
 					 */
-					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
+					ReplicationOriginNameForLogicalRep(sub->oid, subrelid, originname,
 													   sizeof(originname));
 					replorigin_drop_by_name(originname, true, false);
 				}
 
-				ereport(DEBUG1,
-						(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
-										 get_namespace_name(get_rel_namespace(relid)),
-										 get_rel_name(relid),
-										 sub->name)));
+				if (OidIsValid(relstate->relid))
+					ereport(DEBUG1,
+							(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+											 get_namespace_name(get_rel_namespace(relstate->relid)),
+											 get_rel_name(relstate->relid),
+											 sub->name)));
+				else
+					ereport(DEBUG1,
+							(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+											 relstate->nspname, relstate->relname,
+											 sub->name)));
 			}
 		}
 
@@ -1250,6 +1296,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
+				/*
+				 * XXX support SET PUBLICATION WITH (copy_schema = xx)
+				 */
 				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1297,6 +1346,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				List	   *publist;
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
+				/*
+				 * XXX support ADD/DROP PUBLICATION WITH (copy_schema = xx)
+				 */
 				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1357,6 +1409,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
+				/*
+				 * XXX support REFRESH PUBLICATION WITH (copy_schema = xx)
+				 */
 				parse_subscription_options(pstate, stmt->options,
 										   SUBOPT_COPY_DATA, &opts);
 
@@ -1589,7 +1644,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
+		logicalrep_worker_stop(w->subid, w->subrelid);
 	}
 	list_free(subworkers);
 
@@ -1638,7 +1693,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
-	RemoveSubscriptionRel(subid, InvalidOid);
+	RemoveSubscriptionRel(subid, InvalidOid, InvalidOid);
 
 	/* Remove the origin tracking if exists. */
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 052505e46f..8bc5bbe935 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -927,6 +927,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 				case CRS_USE_SNAPSHOT:
 					appendStringInfoString(&cmd, "SNAPSHOT 'use'");
 					break;
+				case CRS_EXPORT_USE_SNAPSHOT:
+					appendStringInfoString(&cmd, "SNAPSHOT 'export-use'");
+					break;
 			}
 		}
 		else
@@ -942,6 +945,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 				case CRS_USE_SNAPSHOT:
 					appendStringInfoString(&cmd, "USE_SNAPSHOT");
 					break;
+				case CRS_EXPORT_USE_SNAPSHOT:
+					elog(ERROR, "XXX CREATE_REPLICATION_SLOT ... EXPORT_USE_SNAPSHOT is not supported yet");
+					break;
 			}
 		}
 
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 4518683779..489205436d 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -958,7 +958,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
 	 */
-	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID,
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 970d170e73..9e0291d9d9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -241,12 +241,12 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 
 /*
  * Walks the workers array and searches for one that matches given
- * subscription id and relid.
+ * subscription id and subrelid.
  *
  * We are only interested in the leader apply worker or table sync worker.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid subrelid, bool only_running)
 {
 	int			i;
 	LogicalRepWorker *res = NULL;
@@ -262,7 +262,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		if (isParallelApplyWorker(w))
 			continue;
 
-		if (w->in_use && w->subid == subid && w->relid == relid &&
+		if (w->in_use && w->subid == subid && w->subrelid == subrelid &&
 			(!only_running || w->proc))
 		{
 			res = w;
@@ -304,7 +304,7 @@ logicalrep_workers_find(Oid subid, bool only_running)
  */
 bool
 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid, dsm_handle subworker_dsm)
+						 Oid subrelid, dsm_handle subworker_dsm)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
@@ -318,7 +318,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
 
 	/* Sanity check - tablesync worker cannot be a subworker */
-	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+	Assert(!(is_parallel_apply_worker && OidIsValid(subrelid)));
 
 	ereport(DEBUG1,
 			(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +393,7 @@ retry:
 	 * sync worker limit per subscription. So, just return silently as we
 	 * might get here because of an otherwise harmless race condition.
 	 */
-	if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+	if (OidIsValid(subrelid) && nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
 		return false;
@@ -434,7 +434,8 @@ retry:
 	worker->dbid = dbid;
 	worker->userid = userid;
 	worker->subid = subid;
-	worker->relid = relid;
+	worker->subrelid = subrelid;
+	worker->relid = InvalidOid; /* will be filled by the tablesync worker */
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
 	worker->stream_fileset = NULL;
@@ -463,9 +464,9 @@ retry:
 	else
 		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
 
-	if (OidIsValid(relid))
+	if (OidIsValid(subrelid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication worker for subscription %u sync %u", subid, relid);
+				 "logical replication worker for subscription %u sync %u", subid, subrelid);
 	else if (is_parallel_apply_worker)
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
 				 "logical replication parallel apply worker for subscription %u", subid);
@@ -591,13 +592,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid subrelid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, false);
+	worker = logicalrep_worker_find(subid, subrelid, false);
 
 	if (worker)
 	{
@@ -640,13 +641,13 @@ logicalrep_pa_worker_stop(int slot_no, uint16 generation)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid subid, Oid subrelid)
 {
 	LogicalRepWorker *worker;
 
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-	worker = logicalrep_worker_find(subid, relid, true);
+	worker = logicalrep_worker_find(subid, subrelid, true);
 
 	if (worker)
 		logicalrep_worker_wakeup_ptr(worker);
@@ -760,6 +761,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	worker->userid = InvalidOid;
 	worker->subid = InvalidOid;
 	worker->relid = InvalidOid;
+	worker->subrelid = InvalidOid;
 	worker->leader_pid = InvalidPid;
 	worker->parallel_apply = false;
 }
@@ -820,7 +822,7 @@ logicalrep_sync_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && OidIsValid(w->relid))
+		if (w->subid == subid && OidIsValid(w->subrelid))
 			res++;
 	}
 
@@ -1263,8 +1265,8 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		worker_pid = worker.proc->pid;
 
 		values[0] = ObjectIdGetDatum(worker.subid);
-		if (OidIsValid(worker.relid))
-			values[1] = ObjectIdGetDatum(worker.relid);
+		if (OidIsValid(worker.subrelid))
+			values[1] = ObjectIdGetDatum(worker.subrelid);
 		else
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 57ad22b48a..78c91f6f36 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -457,9 +457,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 	}
 
 	if (entry->state != SUBREL_STATE_READY)
-		entry->state = GetSubscriptionRelState(MySubscription->oid,
-											   entry->localreloid,
-											   &entry->statelsn);
+		entry->state = GetSubscriptoinRelStateByRelid(MySubscription->oid,
+													  entry->localreloid,
+													  &entry->statelsn);
 
 	return entry;
 }
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 62542827e4..d4e1738df6 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -277,6 +277,7 @@ struct SnapBuild
  */
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
+static bool UsingExportedSnapshot = false;
 
 /* ->committed and ->catchange manipulation */
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
@@ -661,12 +662,12 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
  * sure the xmin horizon hasn't advanced since then.
  */
 const char *
-SnapBuildExportSnapshot(SnapBuild *builder)
+SnapBuildExportSnapshot(SnapBuild *builder, bool use_it)
 {
 	Snapshot	snap;
 	char	   *snapname;
 
-	if (IsTransactionOrTransactionBlock())
+	if (!use_it && IsTransactionOrTransactionBlock())
 		elog(ERROR, "cannot export a snapshot from within a transaction");
 
 	if (SavedResourceOwnerDuringExport)
@@ -674,15 +675,24 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 
 	SavedResourceOwnerDuringExport = CurrentResourceOwner;
 	ExportInProgress = true;
+	UsingExportedSnapshot = use_it;
 
-	StartTransactionCommand();
+	if (!use_it)
+	{
+		StartTransactionCommand();
 
-	/* There doesn't seem to a nice API to set these */
-	XactIsoLevel = XACT_REPEATABLE_READ;
-	XactReadOnly = true;
+		/* There doesn't seem to a nice API to set these */
+		XactIsoLevel = XACT_REPEATABLE_READ;
+		XactReadOnly = true;
+	}
+	else
+		Assert(IsTransactionBlock());
 
 	snap = SnapBuildInitialSnapshot(builder);
 
+	if (use_it)
+		RestoreTransactionSnapshot(snap, MyProc);
+
 	/*
 	 * now that we've built a plain snapshot, make it active and use the
 	 * normal mechanisms for exporting it
@@ -727,7 +737,7 @@ SnapBuildClearExportedSnapshot(void)
 	ResourceOwner tmpResOwner;
 
 	/* nothing exported, that is the usual case */
-	if (!ExportInProgress)
+	if (!ExportInProgress || UsingExportedSnapshot)
 		return;
 
 	if (!IsTransactionState())
@@ -753,6 +763,7 @@ SnapBuildResetExportedSnapshotState(void)
 {
 	SavedResourceOwnerDuringExport = NULL;
 	ExportInProgress = false;
+	UsingExportedSnapshot = false;
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 0c71ae9ba7..6710d4bbcc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -97,9 +97,12 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
@@ -127,6 +130,8 @@ static bool FetchTableStates(bool *started_tx);
 
 static StringInfo copybuf = NULL;
 
+static Oid synchronize_table_schema(char *nspname, char *relname, char *snapshot_name);
+
 /*
  * Exit routine for synchronization worker.
  */
@@ -306,7 +311,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 			StartTransactionCommand();
 
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->subrelid,
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
 
@@ -324,7 +329,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * able to rollback dropped slot.
 		 */
 		ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
+										MyLogicalRepWorker->subrelid,
 										syncslotname,
 										sizeof(syncslotname));
 
@@ -495,7 +500,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * Update the state to READY only after the origin cleanup.
 				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   rstate->relid, rstate->state,
+										   rstate->oid, rstate->state,
 										   rstate->lsn);
 			}
 		}
@@ -509,7 +514,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-												rstate->relid, false);
+												rstate->oid, false);
 
 			if (syncworker)
 			{
@@ -578,7 +583,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					struct tablesync_start_time_mapping *hentry;
 					bool		found;
 
-					hentry = hash_search(last_start_times, &rstate->relid,
+					hentry = hash_search(last_start_times, &rstate->oid,
 										 HASH_ENTER, &found);
 
 					if (!found ||
@@ -589,7 +594,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
-												 rstate->relid,
+												 rstate->oid,
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
@@ -1225,11 +1230,11 @@ copy_table(Relation rel)
  * had changed.
  */
 void
-ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+ReplicationSlotNameForTablesync(Oid suboid, Oid subrelid,
 								char *syncslotname, Size szslot)
 {
 	snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
-			 relid, GetSystemIdentifier());
+			 subrelid, GetSystemIdentifier());
 }
 
 /*
@@ -1245,32 +1250,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 {
 	char	   *slotname;
 	char	   *err;
-	char		relstate;
-	XLogRecPtr	relstate_lsn;
+	char 	   *snapshot;
 	Relation	rel;
 	AclResult	aclresult;
+	SubscriptionRelState *relstate;
+	CRSSnapshotAction action;
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
+	char		nspname[NAMEDATALEN];
+	char		relname[NAMEDATALEN];
+	uint32		syncflags;
 	RepOriginId originid;
 	bool		must_use_password;
+	bool		set_snapshot = false;
 
-	/* Check the state of the table synchronization. */
+	/*
+	 * Fetch the state of the table synchronization.
+	 *
+	 * XXX: Currently we copy some relation state data from relstate
+	 * before freeing the memory at the commit. Probably we need to find
+	 * a better way.
+	 */
 	StartTransactionCommand();
-	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
-									   MyLogicalRepWorker->relid,
-									   &relstate_lsn);
-	CommitTransactionCommand();
+	relstate = GetSubscriptionRelByOid(MyLogicalRepWorker->subrelid);
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->relstate = relstate;
-	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
+	MyLogicalRepWorker->relstate = relstate->state;
+	MyLogicalRepWorker->relstate_lsn = relstate->lsn;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	if (relstate->nspname)
+		strlcpy(nspname, relstate->nspname, NAMEDATALEN);
+
+	if (relstate->relname)
+		strlcpy(relname, relstate->relname, NAMEDATALEN);
+
+	syncflags = relstate->syncflags;
+
+	CommitTransactionCommand();
+
 	/*
 	 * If synchronization is already done or no longer necessary, exit now
 	 * that we've updated shared memory state.
 	 */
-	switch (relstate)
+	switch (MyLogicalRepWorker->relstate)
 	{
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
@@ -1281,7 +1304,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Calculate the name of the tablesync slot. */
 	slotname = (char *) palloc(NAMEDATALEN);
 	ReplicationSlotNameForTablesync(MySubscription->oid,
-									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->subrelid,
 									slotname,
 									NAMEDATALEN);
 
@@ -1309,7 +1332,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	/* Assign the origin tracking record name. */
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
-									   MyLogicalRepWorker->relid,
+									   MyLogicalRepWorker->subrelid,
 									   originname,
 									   sizeof(originname));
 
@@ -1358,7 +1381,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Update the state and make it visible to others. */
 	StartTransactionCommand();
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
+							   MyLogicalRepWorker->subrelid,
 							   MyLogicalRepWorker->relstate,
 							   MyLogicalRepWorker->relstate_lsn);
 	CommitTransactionCommand();
@@ -1366,6 +1389,62 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	StartTransactionCommand();
 
+	/*
+	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
+	 * ensures that both the replication slot we create (see below) and the
+	 * COPY are consistent with each other.
+	 */
+	res = walrcv_exec(LogRepWorkerWalRcvConn,
+					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
+					  0, NULL);
+	if (res->status != WALRCV_OK_COMMAND)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("table copy could not start transaction on publisher: %s",
+						res->err)));
+	walrcv_clear_result(res);
+
+	if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0)
+		action = CRS_EXPORT_USE_SNAPSHOT;
+	else
+		action = CRS_USE_SNAPSHOT;
+
+	/*
+	 * Create a new permanent logical decoding slot. This slot will be used
+	 * for the catchup phase after COPY is done, so tell it to use the
+	 * snapshot to make the final data consistent.
+	 */
+	snapshot = walrcv_create_slot(LogRepWorkerWalRcvConn,
+								  slotname, false /* permanent */ , false /* two_phase */ ,
+								  action, origin_startpos);
+
+	if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0)
+	{
+		Oid newrelid;
+
+		Assert(snapshot);
+
+		set_snapshot = true;
+		PushActiveSnapshot(GetTransactionSnapshot());
+
+		/* Create the empty table */
+		newrelid = synchronize_table_schema(nspname, relname, snapshot);
+
+		/* Update the srrelid of the catalog */
+		UpdateSubscriptionRelRelid(MyLogicalRepWorker->subid, MyLogicalRepWorker->subrelid,
+								   newrelid);
+		MyLogicalRepWorker->relid = newrelid;
+
+		/*
+		 * XXX: Currently, schema sync has to be performed in the same transaction
+		 * as we do COPY (i.e. initial data sync). It might be preferable that we
+		 * do schema sync in a separate transaction so that we can continue from
+		 * the initial data sync even in case where we failed between them. To do
+		 * that, probably we need a new relstate (say) SUBREL_STATE_SCHEMASYNC.
+		 */
+		CommandCounterIncrement();
+	}
+
 	/*
 	 * Use a standard write lock here. It might be better to disallow access
 	 * to the table while it's being synchronized. But we don't want to block
@@ -1399,30 +1478,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						GetUserNameFromId(GetUserId(), true),
 						RelationGetRelationName(rel))));
 
-	/*
-	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
-	 * ensures that both the replication slot we create (see below) and the
-	 * COPY are consistent with each other.
-	 */
-	res = walrcv_exec(LogRepWorkerWalRcvConn,
-					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
-					  0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errcode(ERRCODE_CONNECTION_FAILURE),
-				 errmsg("table copy could not start transaction on publisher: %s",
-						res->err)));
-	walrcv_clear_result(res);
-
-	/*
-	 * Create a new permanent logical decoding slot. This slot will be used
-	 * for the catchup phase after COPY is done, so tell it to use the
-	 * snapshot to make the final data consistent.
-	 */
-	walrcv_create_slot(LogRepWorkerWalRcvConn,
-					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
-
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
 	 * copy is to avoid doing the copy again due to any error in setting up
@@ -1457,8 +1512,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	}
 
 	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
+	if ((syncflags & SUBREL_SYNC_KIND_DATA) != 0)
+	{
+		if (!set_snapshot)
+			PushActiveSnapshot(GetTransactionSnapshot());
+
+		copy_table(rel);
+	}
+
 	PopActiveSnapshot();
 
 	res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
@@ -1479,7 +1540,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * visible to others.
 	 */
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
+							   MyLogicalRepWorker->subrelid,
 							   SUBREL_STATE_FINISHEDCOPY,
 							   MyLogicalRepWorker->relstate_lsn);
 
@@ -1637,3 +1698,92 @@ UpdateTwoPhaseState(Oid suboid, char new_state)
 	heap_freetuple(tup);
 	table_close(rel, RowExclusiveLock);
 }
+
+/*
+ * Fetch the given table definition using pg_dump and restore it. snapshot_name is
+ * the name of the snapshot exported on the publisher. Return the oid of the newly
+ * created table.
+ */
+static Oid
+synchronize_table_schema(char *nspname, char *relname, char *snapshot_name)
+{
+	FILE *handle;
+	Oid relid;
+	Oid nspoid;
+	StringInfoData command;
+	StringInfoData querybuf;
+	char full_path[MAXPGPATH];
+	char buf[1024];
+	int ret;
+
+   if (find_my_exec("pg_dump", full_path) < 0)
+	   elog(ERROR, "\"%s\" was not found", "pg_dump");
+
+   /* Open SPI context. */
+   if (SPI_connect() != SPI_OK_CONNECT)
+	   elog(ERROR, "SPI_connect failed");
+
+   /* Create namespace if not exist */
+   nspoid = get_namespace_oid(nspname, true);
+   if (!OidIsValid(nspoid))
+   {
+	   /* XXX who should be the owner of the new schema? */
+	   nspoid = NamespaceCreate(nspname, GetUserId(), false);
+	   CommandCounterIncrement();
+   }
+
+   /* Construct pg_dump command */
+   initStringInfo(&command);
+   appendStringInfo(&command, "%s -Fp --schema-only -U %s -d \"%s\" --snapshot=%s -f - -t %s.%s",
+					full_path, GetUserNameFromId(GetUserId(), true),
+					MySubscription->conninfo, snapshot_name, nspname, relname);
+   elog(LOG, "XXX pg_dump command \"%s\"", command.data);
+
+   /*
+	* Execute the pg_dump command.
+	*
+	* XXX what if the table already doesn't exist?
+	*/
+   PG_TRY();
+   {
+	   /*
+		* XXX: Currently we execute the pg_dump command with a pipe, but with this way
+		* we cannot handle the command failure. So probably we should dump schema to
+		* the file and perform DDL while reading the dump file.
+		*/
+	   handle = OpenPipeStream(command.data, "r");
+	   if (handle == NULL)
+		   elog(ERROR, "command \"%s\" failed", command.data);
+
+	   initStringInfo(&querybuf);
+	   while (fgets(buf, sizeof(buf), handle))
+	   {
+		   appendStringInfoString(&querybuf, buf);
+
+		   if (buf[strlen(buf) - 2] != ';')
+			   continue;
+
+		   ret = SPI_exec(querybuf.data, 0);
+		   if (ret != SPI_OK_UTILITY && ret != SPI_OK_SELECT)
+			   elog(ERROR, "SPI_exec failed %d: %s", ret, querybuf.data);
+
+		   resetStringInfo(&querybuf);
+	   }
+   }
+   PG_FINALLY();
+   {
+	   ClosePipeStream(handle);
+   }
+   PG_END_TRY();
+
+   CommandCounterIncrement();
+
+   /* Close SPI context */
+   if (SPI_finish() != SPI_OK_FINISH)
+	   elog(ERROR, "SPI_finish failed");
+
+   relid = get_relname_relid(relname, nspoid);
+   Assert(OidIsValid(relid));
+
+   return relid;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 37bb884127..ff81c65aa2 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4502,10 +4502,18 @@ InitializeApplyWorker(void)
 								  (Datum) 0);
 
 	if (am_tablesync_worker())
-		ereport(LOG,
-				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
-						MySubscription->name,
-						get_rel_name(MyLogicalRepWorker->relid))));
+	{
+		if (OidIsValid(MyLogicalRepWorker->relid))
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+							MySubscription->name,
+							get_rel_name(MyLogicalRepWorker->relid))));
+		else
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\", relid %u has started",
+							MySubscription->name,
+							MyLogicalRepWorker->subrelid)));
+	}
 	else
 		ereport(LOG,
 		/* translator: first %s is the name of logical replication worker */
@@ -4621,7 +4629,7 @@ ApplyWorkerMain(Datum main_arg)
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
 	 */
-	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID,
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45b8b3684f..7f959d2765 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1004,6 +1004,8 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 				*snapshot_action = CRS_NOEXPORT_SNAPSHOT;
 			else if (strcmp(action, "use") == 0)
 				*snapshot_action = CRS_USE_SNAPSHOT;
+			else if (strcmp(action, "export-use") == 0)
+				*snapshot_action = CRS_EXPORT_USE_SNAPSHOT;
 			else
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1097,7 +1099,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 			need_full_snapshot = true;
 		}
-		else if (snapshot_action == CRS_USE_SNAPSHOT)
+		else if (snapshot_action == CRS_USE_SNAPSHOT ||
+				 snapshot_action == CRS_EXPORT_USE_SNAPSHOT)
 		{
 			if (!IsTransactionBlock())
 				ereport(ERROR,
@@ -1158,9 +1161,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * snapshot when doing this.
 		 */
 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
-		{
-			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
-		}
+			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder, false);
+		else if (snapshot_action == CRS_EXPORT_USE_SNAPSHOT)
+			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder, true);
 		else if (snapshot_action == CRS_USE_SNAPSHOT)
 		{
 			Snapshot	snap;
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index 4e4a34bde8..a975d169fe 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -565,11 +565,10 @@ static const struct cachedesc cacheinfo[] = {
 		KEY(Anum_pg_subscription_oid),
 		4
 	},
-	[SUBSCRIPTIONRELMAP] = {
+	[SUBSCRIPTIONRELOID] = {
 		SubscriptionRelRelationId,
-		SubscriptionRelSrrelidSrsubidIndexId,
-		KEY(Anum_pg_subscription_rel_srrelid,
-			Anum_pg_subscription_rel_srsubid),
+		SubscriptionRelObjectIdIndexId,
+		KEY(Anum_pg_subscription_rel_oid),
 		64
 	},
 	[TABLESPACEOID] = {
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b2bc81b15f..3600b4d7d5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5493,7 +5493,7 @@
   prorettype => 'record', proargtypes => 'oid',
   proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
   proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proargnames => '{subid,subid,subrelid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 60a2bcca23..530c12f148 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -30,16 +30,28 @@
  */
 CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
 {
+	Oid			oid;	/* Oid */
 	Oid			srsubid BKI_LOOKUP(pg_subscription);	/* Oid of subscription */
-	Oid			srrelid BKI_LOOKUP(pg_class);	/* Oid of relation */
 	char		srsubstate;		/* state of the relation in subscription */
 
+	/*
+	 * schema name and table name used only when the table is not created
+	 * on the subscriber yet.
+	 */
+	NameData	srnspname BKI_FORCE_NULL;
+	NameData	srrelname BKI_FORCE_NULL;
+
+	/* What part do we need to synchronize? */
+	bool		srsyncschema;
+	bool		srsyncdata;
+
 	/*
 	 * Although srsublsn is a fixed-width type, it is allowed to be NULL, so
 	 * we prevent direct C code access to it just as for a varlena field.
 	 */
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 
+	Oid			srrelid BKI_FORCE_NULL;
 	XLogRecPtr	srsublsn BKI_FORCE_NULL;	/* remote LSN of the state change
 											 * used for synchronization
 											 * coordination, or NULL if not
@@ -49,7 +61,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
 
 typedef FormData_pg_subscription_rel *Form_pg_subscription_rel;
 
-DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, SubscriptionRelSrrelidSrsubidIndexId, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops));
+DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_oid_index, 9161, SubscriptionRelObjectIdIndexId, on pg_subscription_rel using btree(oid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, SubscriptionRelSrrelidSrsubidIndexId, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops));
 
 #ifdef EXPOSE_TO_CLIENT_CODE
 
@@ -73,19 +86,31 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, Subsc
 
 #endif							/* EXPOSE_TO_CLIENT_CODE */
 
+#define SUBREL_SYNC_KIND_SCHEMA	0x01
+#define SUBREL_SYNC_KIND_DATA	0x02
+
 typedef struct SubscriptionRelState
 {
+	Oid			oid;
 	Oid			relid;
 	XLogRecPtr	lsn;
 	char		state;
+
+	char		*nspname;
+	char		*relname;
+	uint32		syncflags;	 /* OR of SUBREL_SYNC_KIND_XXX */
 } SubscriptionRelState;
 
-extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
-									XLogRecPtr sublsn);
-extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, bool syncschema,
+									bool syncdata, XLogRecPtr sublsn, char *nspname,
+									char *relname);
+extern void UpdateSubscriptionRelRelid(Oid subid, Oid subrelid, Oid relid);
+extern void UpdateSubscriptionRelState(Oid subid, Oid subrelid, char state,
 									   XLogRecPtr sublsn);
-extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
-extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+extern char GetSubscriptionRelState(Oid subid, Oid subrelid, XLogRecPtr *sublsn);
+extern char GetSubscriptoinRelStateByRelid(Oid subid, Oid relid, XLogRecPtr *sublsn);
+extern SubscriptionRelState *GetSubscriptionRelByOid(Oid subrelid);
+extern void RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid);
 
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index f49b941b53..de43703443 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -68,7 +68,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder);
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
 
 extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
-extern const char *SnapBuildExportSnapshot(SnapBuild *builder);
+extern const char *SnapBuildExportSnapshot(SnapBuild *builder, bool use_it);
 extern void SnapBuildClearExportedSnapshot(void);
 extern void SnapBuildResetExportedSnapshotState(void);
 
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 9df7e50f94..b48f1c0d2a 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -21,7 +21,8 @@ typedef enum
 {
 	CRS_EXPORT_SNAPSHOT,
 	CRS_NOEXPORT_SNAPSHOT,
-	CRS_USE_SNAPSHOT
+	CRS_USE_SNAPSHOT,
+	CRS_EXPORT_USE_SNAPSHOT
 } CRSSnapshotAction;
 
 /* global state */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dce71d2c50..8f982d4a0d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -50,7 +50,13 @@ typedef struct LogicalRepWorker
 	/* Subscription id for the worker. */
 	Oid			subid;
 
-	/* Used for initial table synchronization. */
+	/*
+	 * Used for initial table synchronization.
+	 *
+	 * relid is an invalid oid if the table is not created on the subscriber
+	 * yet.
+	 */
+	Oid			subrelid;
 	Oid			relid;
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
@@ -308,7 +314,7 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 static inline bool
 am_tablesync_worker(void)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return OidIsValid(MyLogicalRepWorker->subrelid);
 }
 
 static inline bool
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 67ea6e4945..1c08ff3c92 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -97,7 +97,7 @@ enum SysCacheIdentifier
 	STATRELATTINH,
 	SUBSCRIPTIONNAME,
 	SUBSCRIPTIONOID,
-	SUBSCRIPTIONRELMAP,
+	SUBSCRIPTIONRELOID,
 	TABLESPACEOID,
 	TRFOID,
 	TRFTYPELANG,
-- 
2.31.1

