diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 2d82fbfad2..9231c7e0c9 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -538,3 +539,123 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 
 	return res;
 }
+/*
+ *  Does any TableSync workers are waiting for applier process to apply pending schema updates ?
+ *  We will scan pg_subscription_rel to see if any table is in SUBREL_SYNC_SCHEMA_DATA_SYNC
+ *  state.
+ */
+bool HasTableSchemaSyncPending(Oid subid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	ScanKeyData skey[1];
+	SysScanDesc scan;
+	bool result = false;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, 1, skey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+		if (subrel->srsubstate == SUBREL_SYNC_SCHEMA_DATA_SYNC)
+		{
+			result = true;
+			break;
+		}
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+	return result;
+}
+
+/*
+ * Update pg_subscription_rel state if we have already applied the lsn > srsublsn
+ * If there is corresponding TableSync worker we will also update its relstate, So
+ * that it can wake up.
+ * return true if no entry in pg_subscription_rel are in SUBREL_SYNC_SCHEMA_DATA_SYNC
+ * else false.
+ */
+bool UpdateTableSchemaSyncPending(Oid subid, XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	ScanKeyData skey[1];
+	SysScanDesc scan;
+	bool result = true;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[0],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, 1, skey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel subrel;
+		bool		isnull;
+		Datum		d;
+		XLogRecPtr tablesync_sublsn;
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+		/* Get the LSN */
+		d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+							Anum_pg_subscription_rel_srsublsn, &isnull);
+		if (isnull)
+			tablesync_sublsn = InvalidXLogRecPtr;
+		else
+			tablesync_sublsn = DatumGetLSN(d);
+		if (subrel->srsubstate == SUBREL_SYNC_SCHEMA_DATA_SYNC)
+		{
+			// We dont have to exit here , we can unblock another tablesync workers
+			if (sublsn < tablesync_sublsn)
+			{
+				result = false;
+				continue;
+			}
+			bool		nulls[Natts_pg_subscription_rel];
+			Datum		values[Natts_pg_subscription_rel];
+			bool		replaces[Natts_pg_subscription_rel];	
+			/* Update the tuple. */
+			memset(values, 0, sizeof(values));
+			memset(nulls, false, sizeof(nulls));
+			memset(replaces, false, sizeof(replaces));
+			replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+			values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(SUBREL_STATE_DATASYNC);
+			replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+			tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+									replaces);
+			/* Update the catalog. */
+			CatalogTupleUpdate(rel, &tup->t_self, tup);
+			//Update tablesync worker state
+			LogicalRepWorker *worker;
+			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+			worker = logicalrep_worker_find(subid, subrel->srrelid, false);
+			LWLockRelease(LogicalRepWorkerLock);
+			if (worker)
+			{
+				SpinLockAcquire(&worker->relmutex);
+				worker->relstate = SUBREL_STATE_DATASYNC;
+				SpinLockRelease(&worker->relmutex);
+			}
+		}
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	table_close(rel, AccessShareLock);
+	return result;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9a534fbb00..d07580f799 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -809,6 +809,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 */
 			tables = fetch_table_list(wrconn, publications, false);
 
+			/*
+			 * Set sync state based on if we were asked to do data copy or
+			 * not.
+			 */
+			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 			/*
 			 * If requested, create permanent slot for the subscription. We
 			 * won't use the initial snapshot for anything, so no need to
@@ -860,6 +865,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				{
 					Assert(snapshot_name != NULL);
 					synchronize_table_schema(conninfo, tables_schema_sync, snapshot_name);
+					/* We only support schema sync when copy_data is enabled */
+					if (table_state == SUBREL_STATE_INIT)
+						table_state = SUBREL_SYNC_SCHEMA_DATA_INIT;
 				}
 
 				if (twophase_enabled)
@@ -870,11 +878,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 								opts.slot_name)));
 			}
 
-			/*
-			 * Set sync state based on if we were asked to do data copy or
-			 * not.
-			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d870a9f69c..f708af25dc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1310,7 +1310,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY ||
+		   MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_INIT ||
+		   MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_SYNC);
 
 	/* Assign the origin tracking record name. */
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
@@ -1355,29 +1357,35 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		goto copy_table_done;
 	}
 
-	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
-	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
-	SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-	/* Update the state and make it visible to others. */
-	StartTransactionCommand();
-	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
-							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
-	CommitTransactionCommand();
-	pgstat_report_stat(true);
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_INIT)
+	{	
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+		MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+		
+		/* Update the state and make it visible to others. */
+		StartTransactionCommand();
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
+		CommitTransactionCommand();
+	}
 
 	StartTransactionCommand();
 
-	/*
-	 * Use a standard write lock here. It might be better to disallow access
-	 * to the table while it's being synchronized. But we don't want to block
-	 * the main apply process from working and it has to open the relation in
-	 * RowExclusiveLock when remapping remote relation id to local one.
-	 */
-	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	if (MyLogicalRepWorker->relstate != SUBREL_SYNC_SCHEMA_DATA_INIT &&
+		   MyLogicalRepWorker->relstate != SUBREL_SYNC_SCHEMA_DATA_SYNC)
+	{
+		/*
+		 * Use a standard write lock here. It might be better to disallow access
+		 * to the table while it's being synchronized. But we don't want to block
+		 * the main apply process from working and it has to open the relation in
+		 * RowExclusiveLock when remapping remote relation id to local one.
+		 */
+		rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	}
 
 	/*
 	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
@@ -1402,6 +1410,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_create_slot(LogRepWorkerWalRcvConn,
 					   slotname, false /* permanent */ , false /* two_phase */ ,
 					   CRS_USE_SNAPSHOT, origin_startpos);
+	if (MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_INIT ||
+			MyLogicalRepWorker->relstate == SUBREL_SYNC_SCHEMA_DATA_SYNC)
+	{
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		/*
+		 * relstate_lsn should be updated first See 
+		 * should_apply_ddl_changes_for_rel for loop
+		 */
+		MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+		MyLogicalRepWorker->relstate = SUBREL_SYNC_SCHEMA_DATA_SYNC;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+		/* Update the state and make it visible to others. */
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->relstate,
+									MyLogicalRepWorker->relstate_lsn);
+		CommitTransactionCommand();
+		wait_for_worker_state_change(SUBREL_STATE_DATASYNC);
+		StartTransactionCommand();
+		rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	}
 
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c233a365ba..4cff394adc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -323,6 +323,8 @@ static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+bool        tablesync_schema_sync_pending = true;
+static XLogRecPtr last_received_cache = InvalidXLogRecPtr;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -525,6 +527,45 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 				 rel->statelsn <= remote_final_lsn));
 }
 
+/*
+ * Should apply worker apply DDLs
+ */
+static bool
+should_apply_ddl_changes_for_rel(Oid relid, XLogRecPtr lsn)
+{
+	if (am_tablesync_worker())
+		return MyLogicalRepWorker->relid == relid;
+	else
+	{
+		//Find if there is a table sync worker active for this table
+		LogicalRepWorker *worker;
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		// Table sync worker does not exist
+		if (!worker)
+			return true;
+		for (;;)
+		{
+			if (worker->relstate != SUBREL_SYNC_SCHEMA_DATA_INIT)
+			{
+				// slot was created after ddl was issued on publisher
+				if (worker->relstate_lsn > lsn)
+					return true;
+				return false;
+			}
+			int rc;
+			rc = WaitLatch(MyLatch,
+						   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+						   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+		
+			if (rc & WL_LATCH_SET)
+				ResetLatch(MyLatch);
+		}
+	}
+	return true;
+}
+
 /*
  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
  *
@@ -3419,6 +3460,38 @@ apply_handle_ddl(StringInfo s)
 
 		plantree_list = pg_plan_queries(querytree_list, ddl_command, 0, NULL);
 
+		if (commandTag == CMDTAG_ALTER_TABLE)
+		{
+			AlterTableStmt *alstmt = (AlterTableStmt *) command->stmt;
+			char	   *schemaname = NULL;
+			char	   *relname = NULL;
+			Oid			relid;
+			Oid			relnamespace_oid = InvalidOid;
+			RangeVar *rv = alstmt->relation;
+			if (!rv)
+			{
+				MemoryContextSwitchTo(oldcontext);
+				return;
+			}
+			schemaname = rv->schemaname;
+			relname = rv->relname;
+			if (schemaname != NULL)
+				relnamespace_oid = get_namespace_oid(schemaname, false);
+			
+			if (OidIsValid(relnamespace_oid))
+				relid = get_relname_relid(relname, relnamespace_oid);
+			else
+				relid = RelnameGetRelid(relname);
+			if (OidIsValid(relid))
+			{
+				if(!should_apply_ddl_changes_for_rel(relid, lsn))
+				{
+					MemoryContextSwitchTo(oldcontext);
+					return;
+				}
+			}
+		}
+
 		/* Done with the snapshot used for parsing/planning */
 		if (snapshot_set)
 			PopActiveSnapshot();
@@ -3686,6 +3759,31 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 		MyLogicalRepWorker->reply_time = send_time;
 	}
 }
+/* 
+ * Signal TableSync worker waiting for applier worker to process pending DDL changes
+ * And also update pg_subscription_rel for SUBREL_SYNC_SCHEMA_DATA_SYNC entries
+ */
+static void signal_tablesync_worker(XLogRecPtr last_received)
+{
+	// Since HasTableSchemaSyncPending will scan pg_subscription_rel
+	// which is expensive operation, we are using caching to avoid
+	// scanning every time.
+	if (tablesync_schema_sync_pending && !IsTransactionState())
+	{
+		if (last_received_cache != last_received)
+		{
+			StartTransactionCommand();
+			if (HasTableSchemaSyncPending(MyLogicalRepWorker->subid))
+			{
+				//TODO update logicalrep_rel_entry
+				if (UpdateTableSchemaSyncPending(MyLogicalRepWorker->subid, last_received))
+					tablesync_schema_sync_pending = false;
+				last_received_cache = last_received;
+			}
+			CommitTransactionCommand();
+		}
+	}
+}
 
 /*
  * Apply main loop.
@@ -3825,6 +3923,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 					MemoryContextReset(ApplyMessageContext);
 				}
+				signal_tablesync_worker(last_received);
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
@@ -3832,6 +3931,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		/* confirm all writes so far */
 		send_feedback(last_received, false, false);
+		signal_tablesync_worker(last_received);
 
 		if (!in_remote_transaction && !in_streamed_transaction)
 		{
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 60a2bcca23..80d119530a 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -57,6 +57,8 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, Subsc
  *		substate constants
  * ----------------
  */
+#define SUBREL_SYNC_SCHEMA_DATA_INIT      'x'
+#define SUBREL_SYNC_SCHEMA_DATA_SYNC      'y'
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
@@ -89,5 +91,7 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern bool HasTableSchemaSyncPending(Oid subid);
+extern bool UpdateTableSchemaSyncPending(Oid subid, XLogRecPtr sublsn);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 921b9974db..c2058fcd70 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -50,5 +50,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
+extern LogicalRepRelMapEntry* logicalrep_rel_find(LogicalRepRelId remoteid);
 
 #endif							/* LOGICALRELATION_H */
