*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 93,98 ****
--- 93,99 ----
  #include "miscadmin.h"
  #include "pg_trace.h"
  #include "pgstat.h"
+ #include "replication/logicallauncher.h"
  #include "replication/origin.h"
  #include "replication/syncrep.h"
  #include "replication/walsender.h"
***************
*** 914,919 **** typedef struct TwoPhaseFileHeader
--- 915,921 ----
  	int32		nabortrels;		/* number of delete-on-abort rels */
  	int32		ninvalmsgs;		/* number of cache invalidation messages */
  	bool		initfileinval;	/* does relcache init file need invalidation? */
+ 	bool		wakeuplauncher;	/* need to wake up logical rep launcher? */
  	uint16		gidlen;			/* length of the GID - GID follows the header */
  } TwoPhaseFileHeader;
  
***************
*** 1025,1030 **** StartPrepare(GlobalTransaction gxact)
--- 1027,1034 ----
  	hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
  	hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
  														  &hdr.initfileinval);
+ 	hdr.wakeuplauncher = on_commit_launcher_wakeup;
+ 	on_commit_launcher_wakeup = false;
  	hdr.gidlen = strlen(gxact->gid) + 1;		/* Include '\0' */
  
  	save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
***************
*** 1501,1506 **** FinishPreparedTransaction(const char *gid, bool isCommit)
--- 1505,1517 ----
  	/* Count the prepared xact as committed or aborted */
  	AtEOXact_PgStat(isCommit);
  
+ 	/* Wake up the logical replication launcher if necessary */
+ 	if (hdr->wakeuplauncher)
+ 	{
+ 		on_commit_launcher_wakeup = true;
+ 		AtEOXact_ApplyLauncher(isCommit);
+ 	}
+ 
  	/*
  	 * And now we can clean up any files we may have left.
  	 */
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 2138,2144 **** CommitTransaction(void)
  	AtEOXact_HashTables(true);
  	AtEOXact_PgStat(true);
  	AtEOXact_Snapshot(true, false);
! 	AtCommit_ApplyLauncher();
  	pgstat_report_xact_timestamp(0);
  
  	CurrentResourceOwner = NULL;
--- 2138,2144 ----
  	AtEOXact_HashTables(true);
  	AtEOXact_PgStat(true);
  	AtEOXact_Snapshot(true, false);
! 	AtEOXact_ApplyLauncher(true);
  	pgstat_report_xact_timestamp(0);
  
  	CurrentResourceOwner = NULL;
***************
*** 2612,2617 **** AbortTransaction(void)
--- 2612,2618 ----
  		AtEOXact_ComboCid();
  		AtEOXact_HashTables(false);
  		AtEOXact_PgStat(false);
+ 		AtEOXact_ApplyLauncher(false);
  		pgstat_report_xact_timestamp(0);
  	}
  
*** a/src/backend/commands/subscriptioncmds.c
--- b/src/backend/commands/subscriptioncmds.c
***************
*** 452,458 **** CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
  
  	heap_close(rel, RowExclusiveLock);
  
! 	ApplyLauncherWakeupAtCommit();
  
  	ObjectAddressSet(myself, SubscriptionRelationId, subid);
  
--- 452,464 ----
  
  	heap_close(rel, RowExclusiveLock);
  
! 	/*
! 	* Request wakeup of the launcher on commit of the transaction.
! 	* This is used to send launcher signal to stop sleeping and process
! 	* the subscriptions when current transaction commits.
! 	*/
! 	if (enabled)
! 		on_commit_launcher_wakeup = true;
  
  	ObjectAddressSet(myself, SubscriptionRelationId, subid);
  
***************
*** 645,651 **** AlterSubscription(AlterSubscriptionStmt *stmt)
  				replaces[Anum_pg_subscription_subenabled - 1] = true;
  
  				if (enabled)
! 					ApplyLauncherWakeupAtCommit();
  
  				update_tuple = true;
  				break;
--- 651,657 ----
  				replaces[Anum_pg_subscription_subenabled - 1] = true;
  
  				if (enabled)
! 					on_commit_launcher_wakeup = true;
  
  				update_tuple = true;
  				break;
*** a/src/backend/replication/logical/launcher.c
--- b/src/backend/replication/logical/launcher.c
***************
*** 83,89 **** static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
  volatile sig_atomic_t got_SIGHUP = false;
  volatile sig_atomic_t got_SIGTERM = false;
  
! static bool	on_commit_launcher_wakeup = false;
  
  Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
  
--- 83,89 ----
  volatile sig_atomic_t got_SIGHUP = false;
  volatile sig_atomic_t got_SIGTERM = false;
  
! bool	on_commit_launcher_wakeup = false;
  
  Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
  
***************
*** 745,771 **** ApplyLauncherShmemInit(void)
  }
  
  /*
!  * Wakeup the launcher on commit if requested.
   */
  void
! AtCommit_ApplyLauncher(void)
  {
! 	if (on_commit_launcher_wakeup)
  		ApplyLauncherWakeup();
- }
  
! /*
!  * Request wakeup of the launcher on commit of the transaction.
!  *
!  * This is used to send launcher signal to stop sleeping and process the
!  * subscriptions when current transaction commits. Should be used when new
!  * tuple was added to the pg_subscription catalog.
! */
! void
! ApplyLauncherWakeupAtCommit(void)
! {
! 	if (!on_commit_launcher_wakeup)
! 		on_commit_launcher_wakeup = true;
  }
  
  static void
--- 745,760 ----
  }
  
  /*
!  * AtEOXact_ApplyLauncher
!  *      Wakeup the launcher on commit if requested.
   */
  void
! AtEOXact_ApplyLauncher(bool isCommit)
  {
! 	if (isCommit && on_commit_launcher_wakeup)
  		ApplyLauncherWakeup();
  
! 	on_commit_launcher_wakeup = false;
  }
  
  static void
*** a/src/include/replication/logicallauncher.h
--- b/src/include/replication/logicallauncher.h
***************
*** 21,27 **** extern void ApplyLauncherMain(Datum main_arg);
  extern Size ApplyLauncherShmemSize(void);
  extern void ApplyLauncherShmemInit(void);
  
! extern void ApplyLauncherWakeupAtCommit(void);
! extern void AtCommit_ApplyLauncher(void);
  
  #endif   /* LOGICALLAUNCHER_H */
--- 21,28 ----
  extern Size ApplyLauncherShmemSize(void);
  extern void ApplyLauncherShmemInit(void);
  
! extern void AtEOXact_ApplyLauncher(bool isCommit);
! 
! extern bool	on_commit_launcher_wakeup;
  
  #endif   /* LOGICALLAUNCHER_H */
