Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-25 Thread Nathan Bossart
On Wed, Jan 25, 2023 at 11:49:27AM -0500, Tom Lane wrote:
> Right.  I fixed some other infelicities and pushed it.

Thanks!

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-25 Thread Nathan Bossart
On Wed, Jan 25, 2023 at 04:12:00PM +0900, Kyotaro Horiguchi wrote:
> At Tue, 24 Jan 2023 10:42:17 -0800, Nathan Bossart  
> wrote in 
>> Here is a first attempt at a patch.  I scanned through all the existing
>> uses of InvalidDsaPointer and DSM_HANDLE_INVALID and didn't notice anything
>> else that needed adjusting.
> 
> There seems to be two cases for DSA_HANDLE_INVALID in dsa_get_handle
> and dsa_attach_in_place, one of which is Assert(), though.

Ah, sorry, I'm not sure how I missed this.  Thanks for looking.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-25 Thread Tom Lane
Kyotaro Horiguchi  writes:
> At Tue, 24 Jan 2023 10:42:17 -0800, Nathan Bossart  
> wrote in 
>> Here is a first attempt at a patch.  I scanned through all the existing
>> uses of InvalidDsaPointer and DSM_HANDLE_INVALID and didn't notice anything
>> else that needed adjusting.

> There seems to be two cases for DSA_HANDLE_INVALID in dsa_get_handle
> and dsa_attach_in_place, one of which is Assert(), though.

Right.  I fixed some other infelicities and pushed it.

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-24 Thread Kyotaro Horiguchi
At Tue, 24 Jan 2023 10:42:17 -0800, Nathan Bossart  
wrote in 
> On Tue, Jan 24, 2023 at 01:13:55PM -0500, Tom Lane wrote:
> > Either that comment needs to be rewritten or we need to invent some
> > more macros.
> 
> Here is a first attempt at a patch.  I scanned through all the existing
> uses of InvalidDsaPointer and DSM_HANDLE_INVALID and didn't notice anything
> else that needed adjusting.

There seems to be two cases for DSA_HANDLE_INVALID in dsa_get_handle
and dsa_attach_in_place, one of which is Assert(), though.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-24 Thread Nathan Bossart
On Tue, Jan 24, 2023 at 01:13:55PM -0500, Tom Lane wrote:
> Either that comment needs to be rewritten or we need to invent some
> more macros.

Here is a first attempt at a patch.  I scanned through all the existing
uses of InvalidDsaPointer and DSM_HANDLE_INVALID and didn't notice anything
else that needed adjusting.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 564bffe5ca..970d170e73 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -922,8 +922,8 @@ ApplyLauncherShmemInit(void)
 
 		memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
 
-		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
-		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
 
 		/* Initialize memory and spin locks for each worker slot. */
 		for (slot = 0; slot < max_logical_replication_workers; slot++)
@@ -947,7 +947,7 @@ logicalrep_launcher_attach_dshmem(void)
 	MemoryContext oldcontext;
 
 	/* Quick exit if we already did this. */
-	if (LogicalRepCtx->last_start_dsh != DSM_HANDLE_INVALID &&
+	if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
 		last_start_times != NULL)
 		return;
 
@@ -957,7 +957,7 @@ logicalrep_launcher_attach_dshmem(void)
 	/* Be sure any local memory allocated by DSA routines is persistent. */
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 
-	if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+	if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
 	{
 		/* Initialize dynamic shared hash table for last-start times. */
 		last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h
index 152927742e..c284c8489c 100644
--- a/src/include/lib/dshash.h
+++ b/src/include/lib/dshash.h
@@ -23,6 +23,9 @@ typedef struct dshash_table dshash_table;
 /* A handle for a dshash_table which can be shared with other processes. */
 typedef dsa_pointer dshash_table_handle;
 
+/* Special value for an unitinitialized dshash_table_handle */
+#define DSHASH_HANDLE_INVALID ((dshash_table_handle) InvalidDsaPointer)
+
 /* The type for hash values. */
 typedef uint32 dshash_hash;
 
diff --git a/src/include/utils/dsa.h b/src/include/utils/dsa.h
index 104386e674..ee59a76447 100644
--- a/src/include/utils/dsa.h
+++ b/src/include/utils/dsa.h
@@ -99,6 +99,9 @@ typedef pg_atomic_uint64 dsa_pointer_atomic;
  */
 typedef dsm_handle dsa_handle;
 
+/* Special value for an unitinitialized dsa_handle */
+#define DSA_HANDLE_INVALID ((dsa_handle) DSM_HANDLE_INVALID)
+
 extern dsa_area *dsa_create(int tranche_id);
 extern dsa_area *dsa_create_in_place(void *place, size_t size,
 	 int tranche_id, dsm_segment *segment);


Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-24 Thread Tom Lane
Nathan Bossart  writes:
> IMO ideally there should be a DSA_HANDLE_INVALID and DSHASH_HANDLE_INVALID
> for use with dsa_handle and dshash_table_handle, respectively.  But your
> patch does seem like an improvement.

Yeah, particularly given that dsa.h says

/*
 * The handle for a dsa_area is currently implemented as the dsm_handle
 * for the first DSM segment backing this dynamic storage area, but client
 * code shouldn't assume that is true.
 */
typedef dsm_handle dsa_handle;

but then provides no way for client code to not be aware that a
dsa_handle is a dsm_handle, if it needs to deal with "invalid" values.
Either that comment needs to be rewritten or we need to invent some
more macros.

I agree that the patch as given is an improvement on what was
committed, but I wonder whether we shouldn't work a little harder
on cleaning this up more widely.

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-24 Thread Nathan Bossart
On Tue, Jan 24, 2023 at 02:55:07AM +, houzj.f...@fujitsu.com wrote:
> I noticed one minor thing in this commit. 
> 
> -
> LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
> -
> 
> The code takes the last_start_dsh as dsm_handle, but it seems it is a 
> dsa_pointer.
> " typedef dsa_pointer dshash_table_handle;" This won’t cause any problem, but 
> I feel
> It would be easier to understand if we take it as dsa_pointer and use 
> InvalidDsaPointer here,
> like what he attached patch does. What do you think ?

IMO ideally there should be a DSA_HANDLE_INVALID and DSHASH_HANDLE_INVALID
for use with dsa_handle and dshash_table_handle, respectively.  But your
patch does seem like an improvement.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




RE: wake up logical workers after ALTER SUBSCRIPTION

2023-01-23 Thread houzj.f...@fujitsu.com
On Monday, January 23, 2023 3:13 AM Tom Lane  wrote:

Hi,

> 
> Nathan Bossart  writes:
> > On Tue, Jan 10, 2023 at 10:59:14AM +0530, Amit Kapila wrote:
> >> I haven't looked in detail but isn't it better to explain somewhere
> >> in the comments that it achieves to rate limit the restart of workers
> >> in case of error and allows them to restart immediately in case of
> >> subscription parameter change?
> 
> > I expanded one of the existing comments to make this clear.
> 
> I pushed v17 with some mostly-cosmetic changes, including more comments.

I noticed one minor thing in this commit. 

-
LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
-

The code takes the last_start_dsh as dsm_handle, but it seems it is a 
dsa_pointer.
" typedef dsa_pointer dshash_table_handle;" This won’t cause any problem, but I 
feel
It would be easier to understand if we take it as dsa_pointer and use 
InvalidDsaPointer here,
like what he attached patch does. What do you think ?

Best regards,
Hou zj





0001-Take-last_start_dsh-as-dsa_pointer.patch
Description: 0001-Take-last_start_dsh-as-dsa_pointer.patch


Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-23 Thread Nathan Bossart
On Sun, Jan 22, 2023 at 02:12:54PM -0500, Tom Lane wrote:
> I pushed v17 with some mostly-cosmetic changes, including more comments.

Thanks!

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-22 Thread Tom Lane
Nathan Bossart  writes:
> On Tue, Jan 10, 2023 at 10:59:14AM +0530, Amit Kapila wrote:
>> I haven't looked in detail but isn't it better to explain somewhere in
>> the comments that it achieves to rate limit the restart of workers in
>> case of error and allows them to restart immediately in case of
>> subscription parameter change?

> I expanded one of the existing comments to make this clear.

I pushed v17 with some mostly-cosmetic changes, including more comments.

> Of course, if the launcher restarts, then the notify_pid will no longer be
> accurate.  However, I see that workers also register a before_shmem_exit
> callback that will send SIGUSR1 to the launcher_pid currently stored in
> shared memory.  (I wonder if there is a memory ordering bug here.)

I think it's all close enough in reality.  There are other issues in
this code, and I'm about to start a new thread about one I identified
while testing this patch, but I think we're in good shape on this
particular point.  I've marked the CF entry as committed.

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-10 Thread Nathan Bossart
On Tue, Jan 10, 2023 at 10:59:14AM +0530, Amit Kapila wrote:
> I haven't looked in detail but isn't it better to explain somewhere in
> the comments that it achieves to rate limit the restart of workers in
> case of error and allows them to restart immediately in case of
> subscription parameter change?

I expanded one of the existing comments to make this clear.

> Another minor point: Don't we need to set the launcher's latch after
> removing the entry from the hash table to avoid the launcher waiting
> on the latch for a bit longer?

The launcher's latch should be set when the apply worker exits.  The apply
worker's notify_pid is set to the launcher, which means the launcher
will be sent SIGUSR1 on exit.  The launcher's SIGUSR1 handler sets its
latch.

Of course, if the launcher restarts, then the notify_pid will no longer be
accurate.  However, I see that workers also register a before_shmem_exit
callback that will send SIGUSR1 to the launcher_pid currently stored in
shared memory.  (I wonder if there is a memory ordering bug here.)

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 358d2ff90f..3cf8dfa11e 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2006,6 +2006,16 @@ postgres   27093  0.0  0.0  30096  2752 ?Ss   11:34   0:00 postgres: ser
   Waiting to read or update information
about heavyweight locks.
  
+ 
+  LogicalRepLauncherDSA
+  Waiting for logical replication launcher dynamic shared memory
+  allocator access
+ 
+ 
+  LogicalRepLauncherHash
+  Waiting for logical replication launcher shared memory hash table
+  access
+ 
  
   LogicalRepWorker
   Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index baff00dd74..468e5e0bf1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Clear the last-start time for the apply worker to free up space.  If
+	 * this transaction rolls back, the launcher might restart the apply worker
+	 * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+	 * probably okay.
+	 */
+	logicalrep_launcher_delete_last_start_time(subid);
+
 	/*
 	 * Cleanup of tablesync replication origins.
 	 *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index afb7acddaa..ecf239e4c2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -57,6 +58,25 @@ int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
 
+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+	Oid		subid;
+	TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(Oid),
+	sizeof(LauncherLastStartTimesEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
 typedef struct LogicalRepCtxStruct
@@ -64,6 +84,10 @@ typedef struct LogicalRepCtxStruct
 	/* Supervisor process. */
 	pid_t		launcher_pid;
 
+	/* hash table for last-start times */
+	dsa_handle	last_start_dsa;
+	dshash_table_handle last_start_dsh;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -76,6 +100,9 @@ static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -902,6 +929,9 @@ ApplyLauncherShmemInit(void)
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(>relmutex);
 		}
+
+		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
 	}
 }
 
@@ -947,8 +977,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
-
 	ereport(DEBUG1,
 			

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-09 Thread Amit Kapila
On Sat, Jan 7, 2023 at 6:15 AM Nathan Bossart  wrote:
>
> On Fri, Jan 06, 2023 at 05:31:26PM -0500, Tom Lane wrote:
>
> > Attached is a rebased 0003, just to keep the cfbot happy.
> > I'm kind of wondering whether 0003 is worth the complexity TBH,
> > but in any case I ran out of time to look at it closely today.
>
> Yeah.  It's not as bad as I was expecting, but it does add a bit more
> complexity than is probably warranted.
>

Personally, I think it is not as complex as we were initially thinking
and does the job accurately unless we are missing something. So, +1 to
proceed with this approach.

I haven't looked in detail but isn't it better to explain somewhere in
the comments that it achieves to rate limit the restart of workers in
case of error and allows them to restart immediately in case of
subscription parameter change?

Another minor point: Don't we need to set the launcher's latch after
removing the entry from the hash table to avoid the launcher waiting
on the latch for a bit longer?

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-09 Thread Nathan Bossart
rebased for cfbot

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index cf220c3bcb..7661c0c86e 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1996,6 +1996,16 @@ postgres   27093  0.0  0.0  30096  2752 ?Ss   11:34   0:00 postgres: ser
   Waiting to read or update information
about heavyweight locks.
  
+ 
+  LogicalRepLauncherDSA
+  Waiting for logical replication launcher dynamic shared memory
+  allocator access
+ 
+ 
+  LogicalRepLauncherHash
+  Waiting for logical replication launcher shared memory hash table
+  access
+ 
  
   LogicalRepWorker
   Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index baff00dd74..468e5e0bf1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Clear the last-start time for the apply worker to free up space.  If
+	 * this transaction rolls back, the launcher might restart the apply worker
+	 * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+	 * probably okay.
+	 */
+	logicalrep_launcher_delete_last_start_time(subid);
+
 	/*
 	 * Cleanup of tablesync replication origins.
 	 *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index afb7acddaa..466917569a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -57,6 +58,25 @@ int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
 
+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+	Oid		subid;
+	TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(Oid),
+	sizeof(LauncherLastStartTimesEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
 typedef struct LogicalRepCtxStruct
@@ -64,6 +84,10 @@ typedef struct LogicalRepCtxStruct
 	/* Supervisor process. */
 	pid_t		launcher_pid;
 
+	/* hash table for last-start times */
+	dsa_handle	last_start_dsa;
+	dshash_table_handle last_start_dsh;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -76,6 +100,9 @@ static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -902,6 +929,9 @@ ApplyLauncherShmemInit(void)
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(>relmutex);
 		}
+
+		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
 	}
 }
 
@@ -947,8 +977,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
-
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
 
@@ -983,58 +1011,56 @@ ApplyLauncherMain(Datum main_arg)
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-	   wal_retrieve_retry_interval))
-		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-		   "Logical Replication Launcher sublist",
-		   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
-			{
-Subscription *sub = (Subscription *) lfirst(lc);
-LogicalRepWorker *w;
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+	   "Logical Replication Launcher sublist",
+	 

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-06 Thread Nathan Bossart
On Fri, Jan 06, 2023 at 05:31:26PM -0500, Tom Lane wrote:
> I've pushed 0001 and 0002, which seem pretty uncontroversial.

Thanks!

> Attached is a rebased 0003, just to keep the cfbot happy.
> I'm kind of wondering whether 0003 is worth the complexity TBH,
> but in any case I ran out of time to look at it closely today.

Yeah.  It's not as bad as I was expecting, but it does add a bit more
complexity than is probably warranted.  I'm not wedded to this approach.

BTW I intend to start a new thread for the bugs I mentioned upthread that
were revealed by setting wal_retrieve_retry_interval to 1ms in the tests.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-06 Thread Tom Lane
Nathan Bossart  writes:
> I found some additional places that should remove the last-start time from
> the hash table.  I've added those in v14.

I've pushed 0001 and 0002, which seem pretty uncontroversial.
Attached is a rebased 0003, just to keep the cfbot happy.
I'm kind of wondering whether 0003 is worth the complexity TBH,
but in any case I ran out of time to look at it closely today.

regards, tom lane

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5bcba0fdec..8f06e234c8 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1991,6 +1991,16 @@ postgres   27093  0.0  0.0  30096  2752 ?Ss   11:34   0:00 postgres: ser
   Waiting to read or update information
about heavyweight locks.
  
+ 
+  LogicalRepLauncherDSA
+  Waiting for logical replication launcher dynamic shared memory
+  allocator access
+ 
+ 
+  LogicalRepLauncherHash
+  Waiting for logical replication launcher shared memory hash table
+  access
+ 
  
   LogicalRepWorker
   Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f15a332bae..88f180c2a7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Clear the last-start time for the apply worker to free up space.  If
+	 * this transaction rolls back, the launcher might restart the apply worker
+	 * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+	 * probably okay.
+	 */
+	logicalrep_launcher_delete_last_start_time(subid);
+
 	/*
 	 * Cleanup of tablesync replication origins.
 	 *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..dfe49db64f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -56,6 +57,25 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 
+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+	Oid		subid;
+	TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+	sizeof(Oid),
+	sizeof(LauncherLastStartTimesEntry),
+	dshash_memcmp,
+	dshash_memhash,
+	LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
 typedef struct LogicalRepCtxStruct
@@ -63,6 +83,10 @@ typedef struct LogicalRepCtxStruct
 	/* Supervisor process. */
 	pid_t		launcher_pid;
 
+	/* hash table for last-start times */
+	dsa_handle	last_start_dsa;
+	dshash_table_handle last_start_dsh;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -74,6 +98,9 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -756,6 +783,9 @@ ApplyLauncherShmemInit(void)
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(>relmutex);
 		}
+
+		LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+		LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
 	}
 }
 
@@ -801,8 +831,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
-
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
 
@@ -837,58 +865,55 @@ ApplyLauncherMain(Datum main_arg)
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-	   wal_retrieve_retry_interval))
-		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-		   "Logical Replication Launcher sublist",
-		   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled 

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-05 Thread Nathan Bossart
I found some additional places that should remove the last-start time from
the hash table.  I've added those in v14.

On Fri, Jan 06, 2023 at 10:30:18AM +0530, Amit Kapila wrote:
> On Thu, Jan 5, 2023 at 10:49 PM Nathan Bossart  
> wrote:
>> On Thu, Jan 05, 2023 at 11:34:37AM +0530, Amit Kapila wrote:
>> > On Thu, Jan 5, 2023 at 10:16 AM Nathan Bossart  
>> > wrote:
>> >> In v12, I moved the restart for two_phase mode to the end of
>> >> process_syncing_tables_for_apply() so that we don't need to rely on 
>> >> another
>> >> iteration of the loop.
>> >
>> > This should work but it is better to add a comment before calling
>> > CommandCounterIncrement() to indicate that this is for making changes
>> > to the relation state visible.
>>
>> Will do.
> 
> Isn't it better to move this part into a separate patch as this is
> useful even without the main patch to improve wakeups?

І moved it to a separate patch in v14.

>> > Thinking along similar lines, won't apply worker need to be notified
>> > of SUBREL_STATE_SYNCWAIT state change by the tablesync worker?
>>
>> wait_for_worker_state_change() should notify the apply worker in this case.
> 
> I think this is yet to be included in the patch, right?

This is already present on HEAD.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 955111b35d9d416b8d895649525b25c03a15e653 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Thu, 5 Jan 2023 21:17:42 -0800
Subject: [PATCH v14 1/3] move restart for two_phase mode to end of
 process_syncing_tables_for_apply()

---
 src/backend/replication/logical/tablesync.c | 48 +++--
 1 file changed, 26 insertions(+), 22 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..280fed8cdb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	bool		should_exit = false;
 
 	Assert(!IsTransactionState());
 
@@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		last_start_times = NULL;
 	}
 
-	/*
-	 * Even when the two_phase mode is requested by the user, it remains as
-	 * 'pending' until all tablesyncs have reached READY state.
-	 *
-	 * When this happens, we restart the apply worker and (if the conditions
-	 * are still ok) then the two_phase tri-state will become 'enabled' at
-	 * that time.
-	 *
-	 * Note: If the subscription has no tables then leave the state as
-	 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-	 * work.
-	 */
-	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-		AllTablesyncsReady())
-	{
-		ereport(LOG,
-(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
-		MySubscription->name)));
-
-		proc_exit(0);
-	}
-
 	/*
 	 * Process all tables that are being synchronized.
 	 */
@@ -619,9 +598,34 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * Even when the two_phase mode is requested by the user, it remains as
+		 * 'pending' until all tablesyncs have reached READY state.
+		 *
+		 * When this happens, we restart the apply worker and (if the conditions
+		 * are still ok) then the two_phase tri-state will become 'enabled' at
+		 * that time.
+		 *
+		 * Note: If the subscription has no tables then leave the state as
+		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+		 * work.
+		 */
+		CommandCounterIncrement();	/* make updates visible */
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+		{
+			ereport(LOG,
+	(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+			MySubscription->name)));
+			should_exit = true;
+		}
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
+
+	if (should_exit)
+		proc_exit(0);
 }
 
 /*
-- 
2.25.1

>From 3018a4029b573c69f6674b86735d0d94c1350eb4 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v14 2/3] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c|  3 ++
 src/backend/commands/alter.c |  7 
 src/backend/commands/subscriptioncmds.c  |  7 
 src/backend/replication/logical/worker.c | 46 
 src/include/replication/logicalworker.h  |  3 ++
 5 files changed, 66 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include 

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-05 Thread Amit Kapila
On Thu, Jan 5, 2023 at 10:49 PM Nathan Bossart  wrote:
>
> On Thu, Jan 05, 2023 at 11:34:37AM +0530, Amit Kapila wrote:
> > On Thu, Jan 5, 2023 at 10:16 AM Nathan Bossart  
> > wrote:
> >> In v12, I moved the restart for two_phase mode to the end of
> >> process_syncing_tables_for_apply() so that we don't need to rely on another
> >> iteration of the loop.
> >
> > This should work but it is better to add a comment before calling
> > CommandCounterIncrement() to indicate that this is for making changes
> > to the relation state visible.
>
> Will do.
>

Isn't it better to move this part into a separate patch as this is
useful even without the main patch to improve wakeups?

> > Thinking along similar lines, won't apply worker need to be notified
> > of SUBREL_STATE_SYNCWAIT state change by the tablesync worker?
>
> wait_for_worker_state_change() should notify the apply worker in this case.
>

I think this is yet to be included in the patch, right?

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-05 Thread Nathan Bossart
On Thu, Jan 05, 2023 at 09:29:24AM -0800, Nathan Bossart wrote:
> On Thu, Jan 05, 2023 at 10:57:58AM +0530, Amit Kapila wrote:
>> True, if we want we can use dshash for this.
> 
> I'll look into this.

Here is an attempt at using dshash.  This is quite a bit cleaner since we
don't need garbage collection or the flag in the worker slots.  There is
some extra work required to set up the table, but it doesn't seem too bad.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 8853500e9b238b0d07871940d4e7db9e43802b8b Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v13 1/2] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 +++
 src/backend/commands/subscriptioncmds.c |  7 +++
 src/backend/replication/logical/tablesync.c | 49 -
 src/backend/replication/logical/worker.c| 46 +++
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 93 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..c0fd82cdf2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
@@ -1733,6 +1737,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 			  form->oid, 0);
 
 	ApplyLauncherWakeupAtCommit();
+
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..8c8f27ebcf 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	bool		should_exit = false;
 
 	Assert(!IsTransactionState());
 
@@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		last_start_times = NULL;
 	}
 
-	/*
-	 * Even when the two_phase mode is requested by the user, it remains as
-	 * 'pending' until all tablesyncs have reached READY state.
-	 *
-	 * When this happens, we restart the apply worker and (if the conditions
-	 * are still ok) then the two_phase tri-state will become 'enabled' at
-	 * that time.
-	 *
-	 * Note: If the subscription has no tables then leave the state as
-	 * PENDING, 

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-05 Thread Nathan Bossart
On Thu, Jan 05, 2023 at 10:57:58AM +0530, Amit Kapila wrote:
> True, if we want we can use dshash for this.

I'll look into this.

> The garbage collection
> mechanism used in the patch seems odd to me as that will remove/add
> entries to the hash table even when the corresponding subscription is
> never dropped.

Yeah, I think this deserves a comment.  We can remove anything beyond
wal_retrieve_retry_interval because the lack of a hash table entry is taken
to mean that we can start the worker immediately.  There might be a corner
case when wal_retrieve_retry_interval is concurrently updated, in which
case we'll effectively use the previous value for the worker.  That doesn't
seem too terrible to me.

It might be possible to remove this garbage collection completely if we use
dshash, but I haven't thought through that approach completely yet.

> Also, adding this garbage collection each time seems
> like an overhead, especially for small values of
> wal_retrieve_retry_interval and a large number of subscriptions.

Right.

> Another point is immediately after cleaning the worker info, trying to
> find it again seems of no use. In logicalrep_worker_launch(), using
> both in_use and restart_immediately to find an unused slot doesn't
> look neat to me, we could probably keep the in_use flag intact if we
> want to reuse the worker. But again after freeing the worker, keeping
> its associated slot allocated sounds odd to me.

Yeah, this flag certainly feels hacky.  With a shared hash table, we could
just have backends remove the last-start-time entry directly, and we
wouldn't need the flag.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-05 Thread Nathan Bossart
On Thu, Jan 05, 2023 at 11:34:37AM +0530, Amit Kapila wrote:
> On Thu, Jan 5, 2023 at 10:16 AM Nathan Bossart  
> wrote:
>> In v12, I moved the restart for two_phase mode to the end of
>> process_syncing_tables_for_apply() so that we don't need to rely on another
>> iteration of the loop.
> 
> This should work but it is better to add a comment before calling
> CommandCounterIncrement() to indicate that this is for making changes
> to the relation state visible.

Will do.

> Thinking along similar lines, won't apply worker need to be notified
> of SUBREL_STATE_SYNCWAIT state change by the tablesync worker?

wait_for_worker_state_change() should notify the apply worker in this case.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Amit Kapila
On Thu, Jan 5, 2023 at 10:16 AM Nathan Bossart  wrote:
>
> On Wed, Jan 04, 2023 at 08:12:37PM -0800, Nathan Bossart wrote:
> > On Thu, Jan 05, 2023 at 09:09:12AM +0530, Amit Kapila wrote:
> >> But there doesn't appear to be any guarantee that the result for
> >> AllTablesyncsReady() will change between the time it is invoked
> >> earlier in the function and at the place you have it in the patch.
> >> This is because the value of 'table_states_valid' may not have
> >> changed. So, how is this supposed to work?
> >
> > The call to CommandCounterIncrement() should set table_states_valid to
> > false if needed.
>
> In v12, I moved the restart for two_phase mode to the end of
> process_syncing_tables_for_apply() so that we don't need to rely on another
> iteration of the loop.
>

This should work but it is better to add a comment before calling
CommandCounterIncrement() to indicate that this is for making changes
to the relation state visible.

Thinking along similar lines, won't apply worker need to be notified
of SUBREL_STATE_SYNCWAIT state change by the tablesync worker?

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Amit Kapila
On Thu, Jan 5, 2023 at 6:19 AM Nathan Bossart  wrote:
>
> On Wed, Jan 04, 2023 at 10:12:19AM -0800, Nathan Bossart wrote:
> > From the discussion thus far, it sounds like the alternatives are to 1) add
> > a global flag that causes wal_retrieve_retry_interval to be bypassed for
> > all workers or to 2) add a hash map in the launcher and a
> > restart_immediately flag in each worker slot.  I'll go ahead and create a
> > patch for 2 since it seems like the most complete solution, and we can
> > evaluate whether the complexity seems appropriate.
>
> Here is a first attempt at adding a hash table to the launcher and a
> restart_immediately flag in each worker slot.  This provides a similar
> speedup to lowering wal_retrieve_retry_interval to 1ms.  I've noted a
> couple of possible race conditions in comments, but none of them seemed
> particularly egregious.  Ideally, we'd put the hash table in shared memory
> so that other backends could adjust it directly, but IIUC that requires it
> to be a fixed size, and the number of subscriptions is virtually unbounded.
>

True, if we want we can use dshash for this. The garbage collection
mechanism used in the patch seems odd to me as that will remove/add
entries to the hash table even when the corresponding subscription is
never dropped. Also, adding this garbage collection each time seems
like an overhead, especially for small values of
wal_retrieve_retry_interval and a large number of subscriptions.

Another point is immediately after cleaning the worker info, trying to
find it again seems of no use. In logicalrep_worker_launch(), using
both in_use and restart_immediately to find an unused slot doesn't
look neat to me, we could probably keep the in_use flag intact if we
want to reuse the worker. But again after freeing the worker, keeping
its associated slot allocated sounds odd to me.

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Nathan Bossart
On Wed, Jan 04, 2023 at 08:12:37PM -0800, Nathan Bossart wrote:
> On Thu, Jan 05, 2023 at 09:09:12AM +0530, Amit Kapila wrote:
>> But there doesn't appear to be any guarantee that the result for
>> AllTablesyncsReady() will change between the time it is invoked
>> earlier in the function and at the place you have it in the patch.
>> This is because the value of 'table_states_valid' may not have
>> changed. So, how is this supposed to work?
> 
> The call to CommandCounterIncrement() should set table_states_valid to
> false if needed.

In v12, I moved the restart for two_phase mode to the end of
process_syncing_tables_for_apply() so that we don't need to rely on another
iteration of the loop.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 57bfee9615e25d62dc975325695fdf445c002a65 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v12 1/2] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 +++
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c | 49 -
 src/backend/replication/logical/worker.c| 46 +++
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 90 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..b9bbb2cf4e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..85e2a2861f 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	bool		should_exit = false;
 
 	Assert(!IsTransactionState());
 
@@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		last_start_times = NULL;
 	}
 
-	/*
-	 * Even when the two_phase mode is requested by the user, it remains as
-	 * 'pending' until all tablesyncs have reached READY state.
-	 *
-	 * When this happens, we restart the apply worker and (if the conditions
-	 * are still ok) then the two_phase tri-state will become 'enabled' at
-	 * that time.
-	 *
-	 * Note: If the subscription has no tables then leave the state as
-	 * PENDING, which allows ALTER 

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Nathan Bossart
On Thu, Jan 05, 2023 at 09:09:12AM +0530, Amit Kapila wrote:
> On Wed, Jan 4, 2023 at 11:03 PM Nathan Bossart  
> wrote:
>> On Wed, Jan 04, 2023 at 09:41:47AM +0530, Amit Kapila wrote:
>> > If so, we probably also need to
>> > ensure that table_states_valid is marked false probably via
>> > invalidations so that we can get the latest state and then perform
>> > this check. I guess if we can do that then we can directly move the
>> > restart logic to the end.
>>
>> IMO this shows the advantage of just waking up the worker.  It doesn't
>> change the apply worker's behavior besides making it more responsive.
> 
> But there doesn't appear to be any guarantee that the result for
> AllTablesyncsReady() will change between the time it is invoked
> earlier in the function and at the place you have it in the patch.
> This is because the value of 'table_states_valid' may not have
> changed. So, how is this supposed to work?

The call to CommandCounterIncrement() should set table_states_valid to
false if needed.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Amit Kapila
On Wed, Jan 4, 2023 at 11:03 PM Nathan Bossart  wrote:
>
> On Wed, Jan 04, 2023 at 09:41:47AM +0530, Amit Kapila wrote:
> > I am not sure if I understand the problem you are trying to solve with
> > this part of the patch. Are you worried that after we mark some of the
> > relation's state as READY, all the table syncs are in the READY state
> > but we will not immediately try to check the two_pahse stuff and
> > probably the apply worker may sleep before the next time it invokes
> > process_syncing_tables_for_apply()?
>
> Yes.
>
> > If so, we probably also need to
> > ensure that table_states_valid is marked false probably via
> > invalidations so that we can get the latest state and then perform
> > this check. I guess if we can do that then we can directly move the
> > restart logic to the end.
>
> IMO this shows the advantage of just waking up the worker.  It doesn't
> change the apply worker's behavior besides making it more responsive.
>

But there doesn't appear to be any guarantee that the result for
AllTablesyncsReady() will change between the time it is invoked
earlier in the function and at the place you have it in the patch.
This is because the value of 'table_states_valid' may not have
changed. So, how is this supposed to work?

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Nathan Bossart
On Wed, Jan 04, 2023 at 10:12:19AM -0800, Nathan Bossart wrote:
> From the discussion thus far, it sounds like the alternatives are to 1) add
> a global flag that causes wal_retrieve_retry_interval to be bypassed for
> all workers or to 2) add a hash map in the launcher and a
> restart_immediately flag in each worker slot.  I'll go ahead and create a
> patch for 2 since it seems like the most complete solution, and we can
> evaluate whether the complexity seems appropriate.

Here is a first attempt at adding a hash table to the launcher and a
restart_immediately flag in each worker slot.  This provides a similar
speedup to lowering wal_retrieve_retry_interval to 1ms.  I've noted a
couple of possible race conditions in comments, but none of them seemed
particularly egregious.  Ideally, we'd put the hash table in shared memory
so that other backends could adjust it directly, but IIUC that requires it
to be a fixed size, and the number of subscriptions is virtually unbounded.
There might still be problems with the patch, but I'm hoping it at least
helps further the discussion about which approach to take.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 2c4c3c552ca83eeb8a4a7cca724ce455ba98ac0f Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v11 1/2] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c | 10 +
 src/backend/replication/logical/worker.c| 46 +
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 73 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7..54145bf805 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a..4e8102b59f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796f..b9bbb2cf4e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2fdfeb5b4c..fcadc1e98e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if 

Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Nathan Bossart
On Wed, Jan 04, 2023 at 10:57:43AM +0530, Amit Kapila wrote:
> On Tue, Jan 3, 2023 at 11:40 PM Nathan Bossart  
> wrote:
>> My approach was to add a variable to LogicalRepWorker that indicated
>> whether a worker needed to be restarted immediately.  While this is a
>> little weird because the workers array is treated as slots, it worked
>> nicely for ALTER SUBSCRIPTION.
> 
> So, are you planning to keep its in_use and subid flag as it is in
> logicalrep_worker_cleanup()? Otherwise, without that it could be
> reused for some other subscription.

I believe I did something like this in my proof-of-concept.  I might have
used the new flag as another indicator that the slot was still "in use".
In any case, you are right that we need to prevent the slot from being
reused.

> What if we maintain a hash table similar to 'last_start_times'
> maintained in tablesync.c? It won't have entries for new
> subscriptions, so for those we may not need to wait till
> wal_retrieve_retry_interval.

I proposed this upthread [0].  I still think it is a worthwhile change.
Right now, if a worker needs to be restarted but another unrelated worker
was restarted less than wal_retrieve_retry_interval milliseconds ago, the
launcher waits to restart it.  I think it makes more sense for each worker
to have its own restart interval tracked.

>> IIUC you are suggesting just one variable that would bypass
>> wal_retrieve_retry_interval for all subscriptions, not just those newly
>> altered or created.  This definitely seems like it would prevent delays,
>> but it would also cause wal_retrieve_retry_interval to be incorrectly
>> bypassed for the other workers in some cases.
>
> Right, but I guess it would be rare in practical cases that someone
> Altered/Created a subscription, and also some workers are restarted
> due to errors/crashes as only in those cases launcher can restart the
> worker when it shouldn't. However, in that case, also, it won't
> restart the apply worker again and again unless there are concurrent
> Create/Alter Subscription operations going on. IIUC, currently also it
> can always first time restart the worker immediately after ERROR/CRASH
> because we don't maintain last_start_time for each worker. I think
> this is probably okay as we want to avoid repeated restarts after the
> ERROR.

This line of thinking is why I felt that lowering
wal_retrieve_retry_interval for the tests might be sufficient.  Besides the
fact that it revealed multiple bugs, I don't see the point in adding much
more complexity here.  In practice, workers will usually start right away,
unless of course there are other worker starts happening around the same
time.  This consistently causes testing delays because the tests stress
these code paths, but I don't think what the tests are doing is a typical
use-case.

>From the discussion thus far, it sounds like the alternatives are to 1) add
a global flag that causes wal_retrieve_retry_interval to be bypassed for
all workers or to 2) add a hash map in the launcher and a
restart_immediately flag in each worker slot.  I'll go ahead and create a
patch for 2 since it seems like the most complete solution, and we can
evaluate whether the complexity seems appropriate.

[0] https://postgr.es/m/20221214171023.GA689106%40nathanxps13

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-04 Thread Nathan Bossart
On Wed, Jan 04, 2023 at 09:41:47AM +0530, Amit Kapila wrote:
> I am not sure if I understand the problem you are trying to solve with
> this part of the patch. Are you worried that after we mark some of the
> relation's state as READY, all the table syncs are in the READY state
> but we will not immediately try to check the two_pahse stuff and
> probably the apply worker may sleep before the next time it invokes
> process_syncing_tables_for_apply()? 

Yes.

> If so, we probably also need to
> ensure that table_states_valid is marked false probably via
> invalidations so that we can get the latest state and then perform
> this check. I guess if we can do that then we can directly move the
> restart logic to the end.

IMO this shows the advantage of just waking up the worker.  It doesn't
change the apply worker's behavior besides making it more responsive.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-03 Thread Amit Kapila
On Tue, Jan 3, 2023 at 11:40 PM Nathan Bossart  wrote:
>
> On Tue, Jan 03, 2023 at 11:03:32AM +0530, Amit Kapila wrote:
> > On Thu, Dec 15, 2022 at 4:47 AM Nathan Bossart  
> > wrote:
> >> On Wed, Dec 14, 2022 at 02:02:58PM -0500, Tom Lane wrote:
> >> > Maybe we could have workers that are exiting for that reason set a
> >> > flag saying "please restart me without delay"?
> >>
> >> That helps a bit, but there are still delays when starting workers for new
> >> subscriptions.  I think we'd need to create a new array in shared memory
> >> for subscription OIDs that need their workers started immediately.
> >
> > That would be tricky because the list of subscription OIDs can be
> > longer than the workers. Can't we set a boolean variable
> > (check_immediate or something like that) in LogicalRepCtxStruct and
> > use that to traverse the subscriptions? So, when any worker will
> > restart because of a parameter change, we can set the variable and
> > send a signal to the launcher. The launcher can then check this
> > variable to decide whether to start the missing workers for enabled
> > subscriptions.
>
> My approach was to add a variable to LogicalRepWorker that indicated
> whether a worker needed to be restarted immediately.  While this is a
> little weird because the workers array is treated as slots, it worked
> nicely for ALTER SUBSCRIPTION.
>

So, are you planning to keep its in_use and subid flag as it is in
logicalrep_worker_cleanup()? Otherwise, without that it could be
reused for some other subscription.

>  However, this doesn't help at all for
> CREATE SUBSCRIPTION.
>

What if we maintain a hash table similar to 'last_start_times'
maintained in tablesync.c? It won't have entries for new
subscriptions, so for those we may not need to wait till
wal_retrieve_retry_interval.

> IIUC you are suggesting just one variable that would bypass
> wal_retrieve_retry_interval for all subscriptions, not just those newly
> altered or created.  This definitely seems like it would prevent delays,
> but it would also cause wal_retrieve_retry_interval to be incorrectly
> bypassed for the other workers in some cases.
>

Right, but I guess it would be rare in practical cases that someone
Altered/Created a subscription, and also some workers are restarted
due to errors/crashes as only in those cases launcher can restart the
worker when it shouldn't. However, in that case, also, it won't
restart the apply worker again and again unless there are concurrent
Create/Alter Subscription operations going on. IIUC, currently also it
can always first time restart the worker immediately after ERROR/CRASH
because we don't maintain last_start_time for each worker. I think
this is probably okay as we want to avoid repeated restarts after the
ERROR.

BTW, now users also have a subscription option 'disable_on_error'
which could also be used to avoid repeated restarts due to ERRORS.

>
  Is this acceptable?
>

To me, this sounds acceptable but if you and others don't think so
then we can try to develop some solution like per-worker-flag and a
hash table as discussed in the earlier part of the email.

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-03 Thread Amit Kapila
On Tue, Jan 3, 2023 at 11:51 PM Nathan Bossart  wrote:
>
> On Tue, Jan 03, 2023 at 11:43:59AM +0530, Amit Kapila wrote:
> > On Wed, Dec 7, 2022 at 11:42 PM Nathan Bossart  
> > wrote:
> >> After sleeping on this, I think we can do better.  IIUC we can simply check
> >> for AllTablesyncsReady() at the end of process_syncing_tables_for_apply()
> >> and wake up the logical replication workers (which should just consiѕt of
> >> setting the current process's latch) if we are ready for two_phase mode.
> >
> > How just waking up will help with two_phase mode? For that, we need to
> > restart the apply worker as we are doing at the beginning of
> > process_syncing_tables_for_apply().
>
> Right.  IIRC waking up causes the apply worker to immediately call
> process_syncing_tables_for_apply() again, which will then proc_exit(0) as
> appropriate.
>

But we are already in apply worker and performing
process_syncing_tables_for_apply(). This means the apply worker is not
waiting/sleeping, so what exactly are we trying to wake up?

>  It might be possible to move the restart logic to the end of
> process_syncing_tables_for_apply() to avoid this extra wakeup.  WDYT?
>

I am not sure if I understand the problem you are trying to solve with
this part of the patch. Are you worried that after we mark some of the
relation's state as READY, all the table syncs are in the READY state
but we will not immediately try to check the two_pahse stuff and
probably the apply worker may sleep before the next time it invokes
process_syncing_tables_for_apply()? If so, we probably also need to
ensure that table_states_valid is marked false probably via
invalidations so that we can get the latest state and then perform
this check. I guess if we can do that then we can directly move the
restart logic to the end.

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-03 Thread Nathan Bossart
On Tue, Jan 03, 2023 at 11:43:59AM +0530, Amit Kapila wrote:
> On Wed, Dec 7, 2022 at 11:42 PM Nathan Bossart  
> wrote:
>> After sleeping on this, I think we can do better.  IIUC we can simply check
>> for AllTablesyncsReady() at the end of process_syncing_tables_for_apply()
>> and wake up the logical replication workers (which should just consiѕt of
>> setting the current process's latch) if we are ready for two_phase mode.
> 
> How just waking up will help with two_phase mode? For that, we need to
> restart the apply worker as we are doing at the beginning of
> process_syncing_tables_for_apply().

Right.  IIRC waking up causes the apply worker to immediately call
process_syncing_tables_for_apply() again, which will then proc_exit(0) as
appropriate.  It might be possible to move the restart logic to the end of
process_syncing_tables_for_apply() to avoid this extra wakeup.  WDYT?

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-03 Thread Nathan Bossart
On Tue, Jan 03, 2023 at 11:03:32AM +0530, Amit Kapila wrote:
> On Thu, Dec 15, 2022 at 4:47 AM Nathan Bossart  
> wrote:
>> On Wed, Dec 14, 2022 at 02:02:58PM -0500, Tom Lane wrote:
>> > Maybe we could have workers that are exiting for that reason set a
>> > flag saying "please restart me without delay"?
>>
>> That helps a bit, but there are still delays when starting workers for new
>> subscriptions.  I think we'd need to create a new array in shared memory
>> for subscription OIDs that need their workers started immediately.
> 
> That would be tricky because the list of subscription OIDs can be
> longer than the workers. Can't we set a boolean variable
> (check_immediate or something like that) in LogicalRepCtxStruct and
> use that to traverse the subscriptions? So, when any worker will
> restart because of a parameter change, we can set the variable and
> send a signal to the launcher. The launcher can then check this
> variable to decide whether to start the missing workers for enabled
> subscriptions.

My approach was to add a variable to LogicalRepWorker that indicated
whether a worker needed to be restarted immediately.  While this is a
little weird because the workers array is treated as slots, it worked
nicely for ALTER SUBSCRIPTION.  However, this doesn't help at all for
CREATE SUBSCRIPTION.

IIUC you are suggesting just one variable that would bypass
wal_retrieve_retry_interval for all subscriptions, not just those newly
altered or created.  This definitely seems like it would prevent delays,
but it would also cause wal_retrieve_retry_interval to be incorrectly
bypassed for the other workers in some cases.  Is this acceptable?

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-02 Thread Amit Kapila
On Wed, Dec 7, 2022 at 11:42 PM Nathan Bossart  wrote:
>
> On Wed, Dec 07, 2022 at 02:07:11PM +0300, Melih Mutlu wrote:
> > Do we also need to wake up all sync workers too? Even if not, I'm not
> > actually sure whether doing that would harm anything though.
> > Just asking since currently the patch wakes up all workers including sync
> > workers if any still exists.
>
> After sleeping on this, I think we can do better.  IIUC we can simply check
> for AllTablesyncsReady() at the end of process_syncing_tables_for_apply()
> and wake up the logical replication workers (which should just consiѕt of
> setting the current process's latch) if we are ready for two_phase mode.
>

How just waking up will help with two_phase mode? For that, we need to
restart the apply worker as we are doing at the beginning of
process_syncing_tables_for_apply().

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2023-01-02 Thread Amit Kapila
On Thu, Dec 15, 2022 at 4:47 AM Nathan Bossart  wrote:
>
> On Wed, Dec 14, 2022 at 02:02:58PM -0500, Tom Lane wrote:
> > Maybe we could have workers that are exiting for that reason set a
> > flag saying "please restart me without delay"?
>
> That helps a bit, but there are still delays when starting workers for new
> subscriptions.  I think we'd need to create a new array in shared memory
> for subscription OIDs that need their workers started immediately.
>

That would be tricky because the list of subscription OIDs can be
longer than the workers. Can't we set a boolean variable
(check_immediate or something like that) in LogicalRepCtxStruct and
use that to traverse the subscriptions? So, when any worker will
restart because of a parameter change, we can set the variable and
send a signal to the launcher. The launcher can then check this
variable to decide whether to start the missing workers for enabled
subscriptions.

-- 
With Regards,
Amit Kapila.




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-31 Thread Nathan Bossart
On Sun, Dec 18, 2022 at 03:36:07PM -0800, Nathan Bossart wrote:
> This seems to have somehow broken the archiving tests on Windows, so
> obviously I owe some better analysis here.  I didn't see anything obvious
> in the logs, but I will continue to dig.

On Windows, WaitForWALToBecomeAvailable() seems to depend on the call to
WaitLatch() for wal_retrieve_retry_interval to ensure that signals are
dispatched (i.e., pgwin32_dispatch_queued_signals()).  My first instinct is
to just always call WaitLatch() in this code path, even if
wal_retrieve_rety_interval milliseconds have already elapsed.  The attached
0003 does this.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From d1025a5d7ad9a966f7ec8bee4bb8127e8a0e1d8b Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v10 1/4] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c | 10 +
 src/backend/replication/logical/worker.c| 46 +
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 73 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7c7fd9f00..70ad51c591 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..509fe2eb19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * If we are ready to enable two_phase mode, wake up the logical
+		 * replication workers to handle this change quickly.
+		 */
+		CommandCounterIncrement();
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+			LogicalRepWorkersWakeupAtCommit(MyLogicalRepWorker->subid);
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
diff --git a/src/backend/replication/logical/worker.c 

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-18 Thread Nathan Bossart
On Thu, Dec 15, 2022 at 02:47:21PM -0800, Nathan Bossart wrote:
> I tried setting wal_retrieve_retry_interval to 1ms for all TAP tests
> (similar to what was done in 2710ccd), and I noticed that the recovery
> tests consistently took much longer.  Upon further inspection, it looks
> like the same (or a very similar) race condition described in e5d494d's
> commit message [0].  With some added debug logs, I see that all of the
> callers of MaybeStartWalReceiver() complete before SIGCHLD is processed, so
> ServerLoop() waits for a minute before starting the WAL receiver.
> 
> A simple fix is to have DetermineSleepTime() take the WalReceiverRequested
> flag into consideration.  The attached 0002 patch shortens the sleep time
> to 100ms if it looks like we are waiting on a SIGCHLD.  I'm not certain
> this is the best approach, but it seems to fix the tests.

This seems to have somehow broken the archiving tests on Windows, so
obviously I owe some better analysis here.  I didn't see anything obvious
in the logs, but I will continue to dig.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-15 Thread Nathan Bossart
I tried setting wal_retrieve_retry_interval to 1ms for all TAP tests
(similar to what was done in 2710ccd), and I noticed that the recovery
tests consistently took much longer.  Upon further inspection, it looks
like the same (or a very similar) race condition described in e5d494d's
commit message [0].  With some added debug logs, I see that all of the
callers of MaybeStartWalReceiver() complete before SIGCHLD is processed, so
ServerLoop() waits for a minute before starting the WAL receiver.

A simple fix is to have DetermineSleepTime() take the WalReceiverRequested
flag into consideration.  The attached 0002 patch shortens the sleep time
to 100ms if it looks like we are waiting on a SIGCHLD.  I'm not certain
this is the best approach, but it seems to fix the tests.

On my machine, I see the following improvements in the tests (all units in
seconds):
 HEAD  patched (v9)
check-world -j8  165   138
subscription 120   75
recovery 111   108

[0] https://postgr.es/m/21344.1498494720%40sss.pgh.pa.us

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From a80c2b3b2a80d024b72be9fca6b5d4136b8b8272 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v9 1/3] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c | 10 +
 src/backend/replication/logical/worker.c| 46 +
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 73 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7c7fd9f00..70ad51c591 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..509fe2eb19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * If we are ready to enable two_phase mode, wake 

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Nathan Bossart
On Wed, Dec 14, 2022 at 02:02:58PM -0500, Tom Lane wrote:
> Maybe we could have workers that are exiting for that reason set a
> flag saying "please restart me without delay"?

That helps a bit, but there are still delays when starting workers for new
subscriptions.  I think we'd need to create a new array in shared memory
for subscription OIDs that need their workers started immediately.

I'm not totally sure this is worth the effort.  These delays surface in the
tests because the workers are started so frequently.  In normal operation,
this is probably unusual, so the launcher would typically start new workers
immediately.  But if you and/or others feel this is worthwhile, I don't
mind working on the patch.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Tom Lane
Nathan Bossart  writes:
> On Wed, Dec 14, 2022 at 01:23:18PM -0500, Tom Lane wrote:
>> Oh.  What in the world is the rationale for that?

> My assumption is that this is meant to avoid starting workers as fast as
> possible if they repeatedly crash.

I can see the point of rate-limiting if the workers are failing to connect
or crashing while trying to process data.  But it's not very sane to
apply the same policy to an intentional worker exit-for-reconfiguration.

Maybe we could have workers that are exiting for that reason set a
flag saying "please restart me without delay"?

A *real* fix would be to not exit at all, at least for reconfigurations
that don't change the connection parameters, but instead cope with
recomputing whatever needs recomputed in the workers' state.  I can
believe that that'd be a lot of work though.

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Nathan Bossart
On Wed, Dec 14, 2022 at 01:23:18PM -0500, Tom Lane wrote:
> Nathan Bossart  writes:
>> I'm reasonably certain the launcher is already signaled like you describe.
>> It'll just wait to start new workers if it's been less than
>> wal_retrieve_retry_interval milliseconds since the last time it started
>> workers.
> 
> Oh.  What in the world is the rationale for that?

My assumption is that this is meant to avoid starting workers as fast as
possible if they repeatedly crash.  I didn't see much discussion in the
original logical replication thread [0], but I do see follow-up discussion
about creating a separate GUC for this [1] [2].

[0] https://postgr.es/m/b8132323-b577-428c-b2aa-bf41a66b18e7%402ndquadrant.com
[1] 
https://postgr.es/m/CAD21AoAjTTGm%2BOx70b2OGWvb77vPcRdYeRv3gkAWx76nXDo%2BEA%40mail.gmail.com
[2] 
https://postgr.es/m/CAD21AoDCnyRJDUY%3DESVVe68AukvOP2dFomTeBFpAd1TiFbjsGg%40mail.gmail.com

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Tom Lane
Nathan Bossart  writes:
> I'm reasonably certain the launcher is already signaled like you describe.
> It'll just wait to start new workers if it's been less than
> wal_retrieve_retry_interval milliseconds since the last time it started
> workers.

Oh.  What in the world is the rationale for that?

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Nathan Bossart
On Wed, Dec 14, 2022 at 12:42:32PM -0500, Tom Lane wrote:
> Nathan Bossart  writes:
>> My first thought is that the latter two uses should be moved to a new
>> parameter, and the apply launcher should store the last start time for each
>> apply worker like the apply workers do for the table-sync workers.  In any
>> case, it probably makes sense to lower this parameter's value for testing
>> so that tests that restart these workers frequently aren't waiting for so
>> long.
> 
>> I can put a patch together if this seems like a reasonable direction to go.
> 
> No, I'm still of the opinion that waiting for the launcher to timeout
> before doing something is fundamentally wrong design.  We should signal
> it when we want it to do something.  That's not different from what
> you're fixing about the workers; why don't you see that it's appropriate
> for the launcher too?

I'm reasonably certain the launcher is already signaled like you describe.
It'll just wait to start new workers if it's been less than
wal_retrieve_retry_interval milliseconds since the last time it started
workers.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Tom Lane
Nathan Bossart  writes:
> My first thought is that the latter two uses should be moved to a new
> parameter, and the apply launcher should store the last start time for each
> apply worker like the apply workers do for the table-sync workers.  In any
> case, it probably makes sense to lower this parameter's value for testing
> so that tests that restart these workers frequently aren't waiting for so
> long.

> I can put a patch together if this seems like a reasonable direction to go.

No, I'm still of the opinion that waiting for the launcher to timeout
before doing something is fundamentally wrong design.  We should signal
it when we want it to do something.  That's not different from what
you're fixing about the workers; why don't you see that it's appropriate
for the launcher too?

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-14 Thread Nathan Bossart
On Tue, Dec 13, 2022 at 04:41:05PM -0800, Nathan Bossart wrote:
> On Tue, Dec 13, 2022 at 07:20:14PM -0500, Tom Lane wrote:
>> I certainly don't think that "wake the apply launcher every 1ms"
>> is a sane configuration.  Unless I'm missing something basic about
>> its responsibilities, it should seldom need to wake at all in
>> normal operation.
> 
> This parameter appears to control how often the apply launcher starts new
> workers.  If it starts new workers in a loop iteration, it updates its
> last_start_time variable, and it won't start any more workers until another
> wal_retrieve_retry_interval has elapsed.  If no new workers need to be
> started, it only wakes up every 3 minutes.

Looking closer, I see that wal_retrieve_retry_interval is used for three
purposes.  It's main purpose seems to be preventing busy-waiting in
WaitForWALToBecomeAvailable(), as that's what's documented.  But it's also
used for logical replication.  The apply launcher uses it as I've describe
above, and the apply workers use it when launching sync workers.  Unlike
the apply launcher, the apply workers store the last start time for each
table's sync worker and use that to determine whether to start a new one.

My first thought is that the latter two uses should be moved to a new
parameter, and the apply launcher should store the last start time for each
apply worker like the apply workers do for the table-sync workers.  In any
case, it probably makes sense to lower this parameter's value for testing
so that tests that restart these workers frequently aren't waiting for so
long.

I can put a patch together if this seems like a reasonable direction to go.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-13 Thread Nathan Bossart
On Tue, Dec 13, 2022 at 07:20:14PM -0500, Tom Lane wrote:
> I certainly don't think that "wake the apply launcher every 1ms"
> is a sane configuration.  Unless I'm missing something basic about
> its responsibilities, it should seldom need to wake at all in
> normal operation.

This parameter appears to control how often the apply launcher starts new
workers.  If it starts new workers in a loop iteration, it updates its
last_start_time variable, and it won't start any more workers until another
wal_retrieve_retry_interval has elapsed.  If no new workers need to be
started, it only wakes up every 3 minutes.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-13 Thread Tom Lane
Nathan Bossart  writes:
> On Tue, Dec 13, 2022 at 06:32:08PM -0500, Tom Lane wrote:
>> I've not chased it further than that, but I venture that the apply
>> launcher also needs a kick in the pants, and/or there needs to be
>> an interlock to ensure that it doesn't wake until after the old
>> apply worker quits.

> This is probably because the tests set wal_retrieve_retry_interval to
> 500ms.  Lowering that to 1ms in Cluster.pm seems to wipe out this
> particular wait, and the total src/test/subscription test time drops from
> 119 seconds to 95 seconds on my machine.

That's not really the direction we should be going in, though.  Ideally
there should be *no* situation where we are waiting for a timeout to
elapse for a process to wake up and notice it ought to do something.
If we have timeouts at all, they should be backstops for the possibility
of a lost interrupt, and it should be possible to set them quite high
without any visible impact on normal operation.  (This gets back to
the business about minimizing idle power consumption, which Simon was
bugging us about recently but that's been on the radar screen for years.)

I certainly don't think that "wake the apply launcher every 1ms"
is a sane configuration.  Unless I'm missing something basic about
its responsibilities, it should seldom need to wake at all in
normal operation.

regards, tom lane




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-13 Thread Nathan Bossart
On Tue, Dec 13, 2022 at 06:32:08PM -0500, Tom Lane wrote:
> Before, there was up to 1 second (with multiple "SELECT count(1) = 0"
> probes from the test script) between the ALTER SUBSCRIPTION command
> and the "apply worker will restart" log entry.  That wait is pretty
> well zapped, but instead now we're waiting hundreds of ms for the
> "apply worker has started" message.
> 
> I've not chased it further than that, but I venture that the apply
> launcher also needs a kick in the pants, and/or there needs to be
> an interlock to ensure that it doesn't wake until after the old
> apply worker quits.

This is probably because the tests set wal_retrieve_retry_interval to
500ms.  Lowering that to 1ms in Cluster.pm seems to wipe out this
particular wait, and the total src/test/subscription test time drops from
119 seconds to 95 seconds on my machine.  This probably lowers the amount
of test coverage we get on the wal_retrieve_retry_interval code paths, but
if that's a concern, perhaps we should write a test specifically for
wal_retrieve_retry_interval.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-13 Thread Tom Lane
Nathan Bossart  writes:
> After sleeping on this, I think we can do better.  IIUC we can simply check
> for AllTablesyncsReady() at the end of process_syncing_tables_for_apply()
> and wake up the logical replication workers (which should just consiѕt of
> setting the current process's latch) if we are ready for two_phase mode.

I independently rediscovered the need for something like this after
wondering why the subscription/t/031_column_list.pl test seemed to
take so much longer than its siblings.  I found that a considerable
amount of the elapsed time was wasted because we were waiting up to
a full second (NAPTIME_PER_CYCLE) for the logrep worker to notice
that something had changed in the local subscription state.  At least
on my machine, it seems that the worst-case timing is reliably hit
multiple times during this test.  Now admittedly, this is probably not
a significant problem in real-world usage; but it's sure annoying that
it eats time during check-world.

However, this patch seems to still be leaving quite a bit on the
table.  Here's the timings I see for the subscription suite in HEAD
(test is just "time make check PROVE_FLAGS=--timer" with an
assert-enabled build):

+++ tap check in src/test/subscription +++
[18:07:38] t/001_rep_changes.pl ... ok 6659 ms ( 0.00 usr  0.00 
sys +  0.89 cusr  0.52 csys =  1.41 CPU)
[18:07:45] t/002_types.pl . ok 1572 ms ( 0.00 usr  0.00 
sys +  0.70 cusr  0.27 csys =  0.97 CPU)
[18:07:47] t/003_constraints.pl ... ok 1436 ms ( 0.01 usr  0.00 
sys +  0.74 cusr  0.25 csys =  1.00 CPU)
[18:07:48] t/004_sync.pl .. ok 3007 ms ( 0.00 usr  0.00 
sys +  0.75 cusr  0.31 csys =  1.06 CPU)
[18:07:51] t/005_encoding.pl .. ok 1468 ms ( 0.00 usr  0.00 
sys +  0.74 cusr  0.21 csys =  0.95 CPU)
[18:07:53] t/006_rewrite.pl ... ok 1494 ms ( 0.00 usr  0.00 
sys +  0.72 cusr  0.24 csys =  0.96 CPU)
[18:07:54] t/007_ddl.pl ... ok 2005 ms ( 0.00 usr  0.00 
sys +  0.73 cusr  0.24 csys =  0.97 CPU)
[18:07:56] t/008_diff_schema.pl ... ok 1746 ms ( 0.01 usr  0.00 
sys +  0.70 cusr  0.28 csys =  0.99 CPU)
[18:07:58] t/009_matviews.pl .. ok 1878 ms ( 0.00 usr  0.00 
sys +  0.71 cusr  0.24 csys =  0.95 CPU)
[18:08:00] t/010_truncate.pl .. ok 2999 ms ( 0.00 usr  0.00 
sys +  0.77 cusr  0.38 csys =  1.15 CPU)
[18:08:03] t/011_generated.pl . ok 1467 ms ( 0.00 usr  0.00 
sys +  0.71 cusr  0.24 csys =  0.95 CPU)
[18:08:04] t/012_collation.pl . skipped: ICU not supported by 
this build
[18:08:04] t/013_partition.pl . ok 4787 ms ( 0.01 usr  0.00 
sys +  1.29 cusr  0.71 csys =  2.01 CPU)
[18:08:09] t/014_binary.pl  ok 2564 ms ( 0.00 usr  0.00 
sys +  0.72 cusr  0.28 csys =  1.00 CPU)
[18:08:12] t/015_stream.pl  ok 2531 ms ( 0.01 usr  0.00 
sys +  0.73 cusr  0.27 csys =  1.01 CPU)
[18:08:14] t/016_stream_subxact.pl  ok 1590 ms ( 0.00 usr  0.00 
sys +  0.70 cusr  0.24 csys =  0.94 CPU)
[18:08:16] t/017_stream_ddl.pl  ok 1610 ms ( 0.00 usr  0.00 
sys +  0.72 cusr  0.25 csys =  0.97 CPU)
[18:08:17] t/018_stream_subxact_abort.pl .. ok 1827 ms ( 0.00 usr  0.00 
sys +  0.73 cusr  0.24 csys =  0.97 CPU)
[18:08:19] t/019_stream_subxact_ddl_abort.pl .. ok 1474 ms ( 0.00 usr  0.00 
sys +  0.71 cusr  0.24 csys =  0.95 CPU)
[18:08:21] t/020_messages.pl .. ok 2423 ms ( 0.01 usr  0.00 
sys +  0.74 cusr  0.25 csys =  1.00 CPU)
[18:08:23] t/021_twophase.pl .. ok 4799 ms ( 0.00 usr  0.00 
sys +  0.82 cusr  0.39 csys =  1.21 CPU)
[18:08:28] t/022_twophase_cascade.pl .. ok 4346 ms ( 0.00 usr  0.00 
sys +  1.12 cusr  0.54 csys =  1.66 CPU)
[18:08:32] t/023_twophase_stream.pl ... ok 3656 ms ( 0.01 usr  0.00 
sys +  0.78 cusr  0.32 csys =  1.11 CPU)
[18:08:36] t/024_add_drop_pub.pl .. ok 3585 ms ( 0.00 usr  0.00 
sys +  0.73 cusr  0.29 csys =  1.02 CPU)
[18:08:39] t/025_rep_changes_for_schema.pl  ok 3631 ms ( 0.00 usr  0.00 
sys +  0.77 cusr  0.34 csys =  1.11 CPU)
[18:08:43] t/026_stats.pl . ok 4096 ms ( 0.00 usr  0.00 
sys +  0.77 cusr  0.32 csys =  1.09 CPU)
[18:08:47] t/027_nosuperuser.pl ... ok 4824 ms ( 0.01 usr  0.00 
sys +  0.77 cusr  0.39 csys =  1.17 CPU)
[18:08:52] t/028_row_filter.pl  ok 5321 ms ( 0.00 usr  0.00 
sys +  0.90 cusr  0.50 csys =  1.40 CPU)
[18:08:57] t/029_on_error.pl .. ok 3748 ms ( 0.00 usr  0.00 
sys +  0.75 cusr  0.32 csys =  1.07 CPU)
[18:09:01] t/030_origin.pl  ok 4496 ms ( 0.00 usr  0.00 
sys +  1.09 cusr  0.45 csys =  1.54 CPU)
[18:09:06] t/031_column_list.pl ... ok13802 ms ( 0.01 usr  0.00 
sys +  1.00 cusr  0.69 csys =  1.70 CPU)

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-07 Thread Nathan Bossart
On Wed, Dec 07, 2022 at 02:07:11PM +0300, Melih Mutlu wrote:
> Do we also need to wake up all sync workers too? Even if not, I'm not
> actually sure whether doing that would harm anything though.
> Just asking since currently the patch wakes up all workers including sync
> workers if any still exists.

After sleeping on this, I think we can do better.  IIUC we can simply check
for AllTablesyncsReady() at the end of process_syncing_tables_for_apply()
and wake up the logical replication workers (which should just consiѕt of
setting the current process's latch) if we are ready for two_phase mode.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 2457eca47c92b29409d5972fc5a3e1c8c038fa6d Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v8 1/1] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c | 10 +
 src/backend/replication/logical/worker.c| 46 +
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 73 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..509fe2eb19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	if (started_tx)
 	{
+		/*
+		 * If we are ready to enable two_phase mode, wake up the logical
+		 * replication workers to handle this change quickly.
+		 */
+		CommandCounterIncrement();
+		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+			AllTablesyncsReady())
+			LogicalRepWorkersWakeupAtCommit(MyLogicalRepWorker->subid);
+
 		CommitTransactionCommand();
 		pgstat_report_stat(true);
 	}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-07 Thread Melih Mutlu
Hi,


> Actually, that's not quite right.  The sync worker will wake up the apply
> worker to change the state from SYNCDONE to READY.  AllTablesyncsReady()
> checks that all tables are READY, so we need to wake up all the workers
> when an apply worker changes the state to READY.  Each worker will then
> evaluate whether to restart for two_phase mode.
>

Right. I didn't think about the two phase case thoroughly. Waking up all
apply workers can help.

Do we also need to wake up all sync workers too? Even if not, I'm not
actually sure whether doing that would harm anything though.
Just asking since currently the patch wakes up all workers including sync
workers if any still exists.

Best,
--
Melih Mutlu
Microsoft


Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-06 Thread Nathan Bossart
On Tue, Dec 06, 2022 at 11:25:51AM -0800, Nathan Bossart wrote:
> On Tue, Dec 06, 2022 at 07:44:46PM +0300, Melih Mutlu wrote:
>> - When the state is SYNCDONE and the apply worker has to wake up to change
>> the state to READY.
>> 
>> I think we already call logicalrep_worker_wakeup_ptr wherever it's needed
>> for the above cases? What am I missing here?
> 
> IIUC we must restart all the apply workers for a subscription to enable
> two_phase mode.  It looks like finish_sync_worker() only wakes up its own
> apply worker.  I moved this logic to where the sync worker marks the state
> as SYNCDONE and added a check that two_phase mode is pending.  Even so,
> there can still be unnecessary wakeups, but this adjustment should limit
> them.

Actually, that's not quite right.  The sync worker will wake up the apply
worker to change the state from SYNCDONE to READY.  AllTablesyncsReady()
checks that all tables are READY, so we need to wake up all the workers
when an apply worker changes the state to READY.  Each worker will then
evaluate whether to restart for two_phase mode.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 1ca15608be05229b3349ba5840abceebb1497fe1 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v7 1/1] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c |  8 
 src/backend/replication/logical/worker.c| 46 +
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 71 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..e253db371a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -517,6 +518,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-06 Thread Nathan Bossart
Thanks for reviewing!

On Tue, Dec 06, 2022 at 07:44:46PM +0300, Melih Mutlu wrote:
> Is it really necessary to wake logical workers up when renaming other than
> subscription or publication? address.objectId will be a valid subid only
> when renaming a subscription.

Oops, that is a mistake.  I only meant to wake up the workers for ALTER
SUBSCRIPTION RENAME.  I think I've fixed this in v6.

> - When the state is SYNCDONE and the apply worker has to wake up to change
> the state to READY.
> 
> I think we already call logicalrep_worker_wakeup_ptr wherever it's needed
> for the above cases? What am I missing here?

IIUC we must restart all the apply workers for a subscription to enable
two_phase mode.  It looks like finish_sync_worker() only wakes up its own
apply worker.  I moved this logic to where the sync worker marks the state
as SYNCDONE and added a check that two_phase mode is pending.  Even so,
there can still be unnecessary wakeups, but this adjustment should limit
them.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 4be842c8a857a13b0bec2ebcbd9f8addf8b8562a Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v6 1/1] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c   |  3 ++
 src/backend/commands/alter.c|  7 
 src/backend/commands/subscriptioncmds.c |  4 ++
 src/backend/replication/logical/tablesync.c |  8 
 src/backend/replication/logical/worker.c| 46 +
 src/include/replication/logicalworker.h |  3 ++
 6 files changed, 71 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		if (strncmp(new_name, "regress_", 8) != 0)
 			elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+		/*
+		 * Wake up the logical replication workers to handle this change
+		 * quickly.
+		 */
+		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
 	else if (nameCacheId >= 0)
 	{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	return myself;
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..ca556f7145 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "replication/slot.h"
@@ -334,6 +335,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
 
+		/*
+		 * We might be ready to enable two_phase mode.  

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-06 Thread Melih Mutlu
Hi Nathan,

@@ -410,6 +411,12 @@ ExecRenameStmt(RenameStmt *stmt)
> stmt->newname);
>   table_close(catalog, RowExclusiveLock);
>
> + /*
> + * Wake up the logical replication workers to handle this
> + * change quickly.
> + */
> + LogicalRepWorkersWakeupAtCommit(address.objectId);


Is it really necessary to wake logical workers up when renaming other than
subscription or publication? address.objectId will be a valid subid only
when renaming a subscription.

@@ -322,6 +323,9 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char
> state,
>
>   /* Cleanup. */
>   table_close(rel, NoLock);
> +
> + /* Wake up the logical replication workers to handle this change
> quickly. */
> + LogicalRepWorkersWakeupAtCommit(subid);


I wonder why a wakeup call is needed every time a subscription relation is
updated.
It seems to me that there are two places where UpdateSubscriptionRelState
is called and we need another worker to wake up:
- When a relation is in SYNCWAIT state, it waits for the apply worker to
wake up and change the relation state to CATCHUP. Then tablesync worker
needs to wake up to continue from CATCHUP state.
- When the state is SYNCDONE and the apply worker has to wake up to change
the state to READY.

I think we already call logicalrep_worker_wakeup_ptr wherever it's needed
for the above cases? What am I missing here?

Best,
--
Melih Mutlu
Microsoft


Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-02 Thread Nathan Bossart
On Thu, Dec 01, 2022 at 04:21:30PM -0800, Nathan Bossart wrote:
> Okay, here is a new version of the patch.  This seems to clear up
> everything that I could find via the tests.

I cleaned up the patch a bit.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 72adfb291ee84b8a20f6634639b15eff6d48cf9a Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v5 1/1] wake up logical workers as needed instead of relying
 on periodic wakeups

---
 src/backend/access/transam/xact.c|  3 ++
 src/backend/catalog/pg_subscription.c|  4 +++
 src/backend/commands/alter.c |  7 
 src/backend/commands/subscriptioncmds.c  |  4 +++
 src/backend/replication/logical/worker.c | 46 
 src/include/replication/logicalworker.h  |  3 ++
 6 files changed, 67 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a506fc3ec8..ab75506094 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/logicalworker.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -322,6 +323,9 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 
 	/* Cleanup. */
 	table_close(rel, NoLock);
+
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
 }
 
 /*
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..da14e91552 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -410,6 +411,12 @@ ExecRenameStmt(RenameStmt *stmt)
 		   stmt->newname);
 table_close(catalog, RowExclusiveLock);
 
+/*
+ * Wake up the logical replication workers to handle this
+ * change quickly.
+ */
+LogicalRepWorkersWakeupAtCommit(address.objectId);
+
 return address;
 			}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..e29cdcbff9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1358,6 +1359,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f9efe6c4c6..d735be4f10 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && 

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-12-01 Thread Nathan Bossart
On Tue, Nov 29, 2022 at 09:04:41PM -0800, Nathan Bossart wrote:
> I don't mind fixing it!  There are a couple more I'd like to track down
> before posting another revision.

Okay, here is a new version of the patch.  This seems to clear up
everything that I could find via the tests.

Thanks to this effort, I discovered that we need to include
wal_retrieve_retry_interval in our wait time calculations after failed
tablesyncs (for the suppress-unnecessary-wakeups patch).  I'll make that
change and post that patch in a new thread.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 79db8837e5b3054fb6007326d230aef50f3cda87 Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v4 1/1] wake up logical workers after ALTER SUBSCRIPTION and
 when two_phase mode can be enabled

---
 src/backend/access/transam/xact.c|  3 ++
 src/backend/catalog/pg_subscription.c|  7 
 src/backend/commands/alter.c |  7 
 src/backend/commands/subscriptioncmds.c  |  4 +++
 src/backend/replication/logical/worker.c | 46 
 src/include/replication/logicalworker.h  |  3 ++
 6 files changed, 70 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a506fc3ec8..d9f21b7dcf 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/logicalworker.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -320,6 +321,12 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	/* Update the catalog. */
 	CatalogTupleUpdate(rel, >t_self, tup);
 
+	/*
+	 * We might be ready to enable two_phase mode, which requires the workers
+	 * to restart.  Wake them up at commit time to check the status.
+	 */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	/* Cleanup. */
 	table_close(rel, NoLock);
 }
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..da14e91552 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -410,6 +411,12 @@ ExecRenameStmt(RenameStmt *stmt)
 		   stmt->newname);
 table_close(catalog, RowExclusiveLock);
 
+/*
+ * Wake up the logical replication workers to handle this
+ * change quickly.
+ */
+LogicalRepWorkersWakeupAtCommit(address.objectId);
+
 return address;
 			}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..e29cdcbff9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1358,6 +1359,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
+	/* Wake up the logical replication workers to handle this change quickly. */
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e48a3f589a..c39ca5a3cb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List 

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-29 Thread Nathan Bossart
On Wed, Nov 30, 2022 at 05:27:40PM +1300, Thomas Munro wrote:
> On Wed, Nov 30, 2022 at 5:23 PM Thomas Munro  wrote:
>> On Wed, Nov 30, 2022 at 5:10 PM Nathan Bossart  
>> wrote:
>> > I spent some more time on the prevent-unnecessary-wakeups patch for
>> > logical/worker.c that I've been alluding to in this thread, and I found a
>> > few more places where we depend on the worker periodically waking up.  This
>> > seems to be a common technique, so I'm beginning to wonder whether these
>> > changes are worthwhile.  I think there's a good chance it would become a
>> > game of whac-a-mole.
>>
>> Aren't they all bugs, though, making our tests and maybe even real
>> systems slower than they need to be?

Yeah, you're right, it's probably worth proceeding with this particular
thread even if we don't end up porting the suppress-unnecessary-wakeups
patch to logical/worker.c.

> (Which isn't to suggest that it's your job to fix them, but please do
> share what you have if you run out of whack-a-mole steam, since we
> seem to have several people keen to finish those moles off.)

I don't mind fixing it!  There are a couple more I'd like to track down
before posting another revision.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-29 Thread Hayato Kuroda (Fujitsu)
Dear Nathan,

> I spent some more time on the prevent-unnecessary-wakeups patch for
> logical/worker.c that I've been alluding to in this thread, and I found a
> few more places where we depend on the worker periodically waking up.  This
> seems to be a common technique, so I'm beginning to wonder whether these
> changes are worthwhile.  I think there's a good chance it would become a
> game of whac-a-mole.

I think at least this feature is needed for waking up workers that are slept 
due to the min_apply_delay.
The author supposed this patch and pinned our thread[1].

[1]: 
https://www.postgresql.org/message-id/TYCPR01MB8373775ECC6972289AF8CB30ED0F9%40TYCPR01MB8373.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-29 Thread Thomas Munro
On Wed, Nov 30, 2022 at 5:23 PM Thomas Munro  wrote:
> On Wed, Nov 30, 2022 at 5:10 PM Nathan Bossart  
> wrote:
> > I spent some more time on the prevent-unnecessary-wakeups patch for
> > logical/worker.c that I've been alluding to in this thread, and I found a
> > few more places where we depend on the worker periodically waking up.  This
> > seems to be a common technique, so I'm beginning to wonder whether these
> > changes are worthwhile.  I think there's a good chance it would become a
> > game of whac-a-mole.
>
> Aren't they all bugs, though, making our tests and maybe even real
> systems slower than they need to be?

(Which isn't to suggest that it's your job to fix them, but please do
share what you have if you run out of whack-a-mole steam, since we
seem to have several people keen to finish those moles off.)




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-29 Thread Thomas Munro
On Wed, Nov 30, 2022 at 5:10 PM Nathan Bossart  wrote:
> I spent some more time on the prevent-unnecessary-wakeups patch for
> logical/worker.c that I've been alluding to in this thread, and I found a
> few more places where we depend on the worker periodically waking up.  This
> seems to be a common technique, so I'm beginning to wonder whether these
> changes are worthwhile.  I think there's a good chance it would become a
> game of whac-a-mole.

Aren't they all bugs, though, making our tests and maybe even real
systems slower than they need to be?




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-29 Thread Nathan Bossart
I spent some more time on the prevent-unnecessary-wakeups patch for
logical/worker.c that I've been alluding to in this thread, and I found a
few more places where we depend on the worker periodically waking up.  This
seems to be a common technique, so I'm beginning to wonder whether these
changes are worthwhile.  I think there's a good chance it would become a
game of whac-a-mole.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-27 Thread Nathan Bossart
On Thu, Nov 24, 2022 at 05:26:27AM +, Hayato Kuroda (Fujitsu) wrote:
> I have no comments for the v3 patch.

Thanks for reviewing!  Does anyone else have thoughts on the patch?

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-23 Thread Hayato Kuroda (Fujitsu)
Dear Nathan,

Thank you for updating the patch!

> In v3, I moved the call to LogicalRepWorkersWakeupAtCommit() to the end of
> the function.  This should avoid waking up workers in some cases where it's
> unnecessary (e.g., if ALTER SUBSCRIPTION ERRORs in a subtransaction), but
> there are still cases where we'll wake up the workers unnecessarily.  I
> think this is unlikely to cause any real problems in practice.

I understood you could accept false-positive event to avoid missing 
true-negative
like ALTER SUBSCRIPTION REFRESH. +1.

> >> 02. LogicalRepWorkersWakeupAtCommit()
> >>
> >> ```
> >> +  oldcxt = MemoryContextSwitchTo(TopTransactionContext);
> >> +  on_commit_wakeup_workers_subids =
> >> lappend_oid(on_commit_wakeup_workers_subids,
> >> +
> >>  subid);
> >> ```
> >>
> >> If the subscription is altered twice in the same transaction, the same 
> >> subid will
> >> be recorded twice.
> >> I'm not sure whether it may be caused some issued, but list_member_oid() 
> >> can
> >> be used to avoid that.
> >
> > +1, list_append_unique_oid might be better.
> 
> Done in v3.

I have no comments for the v3 patch.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-23 Thread Nathan Bossart
On Tue, Nov 22, 2022 at 04:59:28PM +0530, Amit Kapila wrote:
> On Tue, Nov 22, 2022 at 6:11 AM Nathan Bossart  
> wrote:
>> While working on avoiding unnecessary wakeups in logical/worker.c (as was
>> done for walreceiver.c in 05a7be9), I noticed that the tests began taking
>> much longer.  This seems to be caused by the reduced frequency of calls to
>> maybe_reread_subscription() in LogicalRepApplyLoop().
> 
> I think it would be interesting to know why tests started taking more
> time after a reduced frequency of calls to
> maybe_reread_subscription(). IIRC, we anyway call
> maybe_reread_subscription for each xact.

At the moment, commands like ALTER SUBSCRIPTION don't wake up the logical
workers for the target subscription, so the next call to
maybe_reread_subscription() may not happen for a while.  Presently, we'll
only sleep up to a second in the apply loop, but with my new
prevent-unnecessary-wakeups patch, we may sleep for much longer.  This
causes wait_for_subscription_sync to take more time after some ALTER
SUBSCRIPTION commands.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-23 Thread Nathan Bossart
On Tue, Nov 22, 2022 at 07:25:36AM +, houzj.f...@fujitsu.com wrote:
> On Tuesday, November 22, 2022 2:49 PM Hayato Kuroda (Fujitsu) 
> 
>> Thanks for updating! It becomes better. Further comments:
>> 
>> 01. AlterSubscription()
>> 
>> ```
>> +LogicalRepWorkersWakeupAtCommit(subid);
>> +
>> ```
>> 
>> Currently subids will be recorded even if the subscription is not modified.
>> I think LogicalRepWorkersWakeupAtCommit() should be called inside the if
>> (update_tuple).
> 
> I think an exception would be REFRESH PULLICATION in which case update_tuple 
> is
> false, but it seems better to wake up apply worker in this case as well,
> because the apply worker is also responsible to start table sync workers for
> newly subscribed tables(in process_syncing_tables()).
> 
> Besides, it seems not a must to wake up apply worker for ALTER SKIP 
> TRANSACTION,
> Although there might be no harm for waking up in this case.

In v3, I moved the call to LogicalRepWorkersWakeupAtCommit() to the end of
the function.  This should avoid waking up workers in some cases where it's
unnecessary (e.g., if ALTER SUBSCRIPTION ERRORs in a subtransaction), but
there are still cases where we'll wake up the workers unnecessarily.  I
think this is unlikely to cause any real problems in practice.

>> 02. LogicalRepWorkersWakeupAtCommit()
>> 
>> ```
>> +oldcxt = MemoryContextSwitchTo(TopTransactionContext);
>> +on_commit_wakeup_workers_subids =
>> lappend_oid(on_commit_wakeup_workers_subids,
>> +
>>subid);
>> ```
>> 
>> If the subscription is altered twice in the same transaction, the same subid 
>> will
>> be recorded twice.
>> I'm not sure whether it may be caused some issued, but list_member_oid() can
>> be used to avoid that.
> 
> +1, list_append_unique_oid might be better.

Done in v3.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From b80c5934b40034dfbbde411721db1d4171c86d5c Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v3 1/1] wake up logical workers after ALTER SUBSCRIPTION

---
 src/backend/access/transam/xact.c|  3 ++
 src/backend/commands/subscriptioncmds.c  |  3 ++
 src/backend/replication/logical/worker.c | 46 
 src/include/replication/logicalworker.h  |  3 ++
 4 files changed, 55 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..c761785947 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1358,6 +1359,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e48a3f589a..d101cddf6c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell  

Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-22 Thread Amit Kapila
On Tue, Nov 22, 2022 at 6:11 AM Nathan Bossart  wrote:
>
> While working on avoiding unnecessary wakeups in logical/worker.c (as was
> done for walreceiver.c in 05a7be9), I noticed that the tests began taking
> much longer.  This seems to be caused by the reduced frequency of calls to
> maybe_reread_subscription() in LogicalRepApplyLoop().
>

I think it would be interesting to know why tests started taking more
time after a reduced frequency of calls to
maybe_reread_subscription(). IIRC, we anyway call
maybe_reread_subscription for each xact.

-- 
With Regards,
Amit Kapila.




RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread houzj.f...@fujitsu.com
On Tuesday, November 22, 2022 2:49 PM Hayato Kuroda (Fujitsu) 

> 
> Dear Nathan,
> 
> > I think you are correct.  I did it this way in v2.  I've also moved
> > the bulk of the logic to logical/worker.c.
> 
> Thanks for updating! It becomes better. Further comments:
> 
> 01. AlterSubscription()
> 
> ```
> + LogicalRepWorkersWakeupAtCommit(subid);
> +
> ```
> 
> Currently subids will be recorded even if the subscription is not modified.
> I think LogicalRepWorkersWakeupAtCommit() should be called inside the if
> (update_tuple).

I think an exception would be REFRESH PULLICATION in which case update_tuple is
false, but it seems better to wake up apply worker in this case as well,
because the apply worker is also responsible to start table sync workers for
newly subscribed tables(in process_syncing_tables()).

Besides, it seems not a must to wake up apply worker for ALTER SKIP TRANSACTION,
Although there might be no harm for waking up in this case.

> 
> 02. LogicalRepWorkersWakeupAtCommit()
> 
> ```
> + oldcxt = MemoryContextSwitchTo(TopTransactionContext);
> + on_commit_wakeup_workers_subids =
> lappend_oid(on_commit_wakeup_workers_subids,
> +
> subid);
> ```
> 
> If the subscription is altered twice in the same transaction, the same subid 
> will
> be recorded twice.
> I'm not sure whether it may be caused some issued, but list_member_oid() can
> be used to avoid that.

+1, list_append_unique_oid might be better.

Best regards,
Hou zj




RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread Takamichi Osumi (Fujitsu)
On Tuesday, November 22, 2022 1:39 PM Nathan Bossart  
wrote:
> On Tue, Nov 22, 2022 at 03:03:52AM +, Hayato Kuroda (Fujitsu) wrote:
> > Just One comment: IIUC the statement "ALTER SUBSCRIPTION" can be
> > executed inside the transaction. So if two subscriptions are altered
> > in the same transaction, only one of them will awake. Is it expected
> behavior?
> >
> > I think we can hold a suboid list and record oids when the
> > subscription are altered, and then the backend process can consume all
> > of list cells at the end of the transaction.
> 
> I think you are correct.  I did it this way in v2.  I've also moved the bulk 
> of
> the logic to logical/worker.c.
Hi, thanks for updating.


I just quickly had a look at your patch and had one minor question.

With this patch, when we execute alter subscription in a sub transaction
and additionally rollback to it, is there any possibility that
we'll wake up the workers that don't need to do so ?

I'm not sure if this brings about some substantial issue,
but just wondering if there is any need of improvement for this.



Best Regards,
Takamichi Osumi





RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread Hayato Kuroda (Fujitsu)
Dear Nathan,

> I think you are correct.  I did it this way in v2.  I've also moved the
> bulk of the logic to logical/worker.c.

Thanks for updating! It becomes better. Further comments:

01. AlterSubscription()

```
+   LogicalRepWorkersWakeupAtCommit(subid);
+
```

Currently subids will be recorded even if the subscription is not modified.
I think LogicalRepWorkersWakeupAtCommit() should be called inside the if 
(update_tuple).

02. LogicalRepWorkersWakeupAtCommit()

```
+   oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+   on_commit_wakeup_workers_subids = 
lappend_oid(on_commit_wakeup_workers_subids,
+   
  subid);
```

If the subscription is altered twice in the same transaction, the same subid 
will be recorded twice.
I'm not sure whether it may be caused some issued, but list_member_oid() can be 
used to avoid that.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED





Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread Nathan Bossart
On Tue, Nov 22, 2022 at 03:03:52AM +, Hayato Kuroda (Fujitsu) wrote:
> Just One comment: IIUC the statement "ALTER SUBSCRIPTION" can be executed
> inside the transaction. So if two subscriptions are altered in the same
> transaction, only one of them will awake. Is it expected behavior?
> 
> I think we can hold a suboid list and record oids when the subscription are
> altered, and then the backend process can consume all of list cells at the 
> end of
> the transaction.

I think you are correct.  I did it this way in v2.  I've also moved the
bulk of the logic to logical/worker.c.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 57bbd9e8d9da72c50d9a41704f3f42b37aba33ab Mon Sep 17 00:00:00 2001
From: Nathan Bossart 
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v2 1/1] wake up logical workers after ALTER SUBSCRIPTION

---
 src/backend/access/transam/xact.c|  3 ++
 src/backend/commands/subscriptioncmds.c  |  3 ++
 src/backend/replication/logical/worker.c | 46 
 src/include/replication/logicalworker.h  |  3 ++
 4 files changed, 55 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..9f225008c4 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1031,6 +1032,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	form = (Form_pg_subscription) GETSTRUCT(tup);
 	subid = form->oid;
 
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	/* must be owner */
 	if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e48a3f589a..f61cce7abf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers reread the subscription info as soon
+ * as possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = lappend_oid(on_commit_wakeup_workers_subids,
+  subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index cd1b6e8afc..2c2340d758 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void 

RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread Hayato Kuroda (Fujitsu)
Hi Nathan,

I have done almost same thing locally for [1], but I thought your code seemed 
better.

Just One comment: IIUC the statement "ALTER SUBSCRIPTION" can be executed
inside the transaction. So if two subscriptions are altered in the same
transaction, only one of them will awake. Is it expected behavior?

I think we can hold a suboid list and record oids when the subscription are
altered, and then the backend process can consume all of list cells at the end 
of
the transaction.

How do you think?

[1]: https://commitfest.postgresql.org/40/3581/

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread Nathan Bossart
On Tue, Nov 22, 2022 at 03:16:05PM +1300, Thomas Munro wrote:
> Maybe a comment to explain why a single variable is enough?

This crossed my mind shortly after sending my previous message.  Looking
closer, I see that several types of ALTER SUBSCRIPTION do not call
PreventInTransactionBlock(), so a single variable might not be enough.
Perhaps we can put a list in TopTransactionContext.  I'll do some more
investigation and report back.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com




Re: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread Thomas Munro
On Tue, Nov 22, 2022 at 1:41 PM Nathan Bossart  wrote:
> On my machine, the attached patch
> improved 'check-world -j8' run time by ~12 seconds (from 3min 8sec to 2min
> 56 sec) and src/test/subscription test time by ~17 seconds (from 139
> seconds to 122 seconds).

Nice!

Maybe a comment to explain why a single variable is enough?  And an
assertion that it wasn't already set?  And a note to future self: this
would be a candidate user of the nearby SetLatches() patch (which is
about moving SetLatch() syscalls out from under LWLocks, though this
one may not be very hot).