On Mon, 3 Mar 2025 at 16:41, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Mar 3, 2025 at 2:30 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > On Tue, 25 Feb 2025 at 15:32, vignesh C <vignes...@gmail.com> wrote:
> > >
> > > The attached script has the script that was used for testing. Here the
> > > NUM_RECORDS count should be changed accordingly for each of the tests
> > > and while running the test with the patch change uncomment the drop
> > > publication command.
> >
> > I have done further analysis on the test and changed the test to
> > compare it better with HEAD. The execution time is in milliseconds.
> > Brach/records  |  100     |  1000   |  10000    |  100000  |  1000000
> > Head               |   10.43  |  15.86  |   64.44    |  550.56  |   8991.04
> > Patch              |   11.35  |  17.26   |   73.50    |  640.21  |  10104.72
> > % diff              |   -8.82  |  -8.85    |   -14.08   |   -16.28  |  
> > -12.38
> >
> > There is a  performance degradation in the range of 8.8 to 16.2 percent.
> >
>
> - /* Validate the entry */
> - if (!entry->replicate_valid)
> + /*
> + * If the publication is invalid, check for updates.
> + * This optimization ensures that the next block, which queries the system
> + * tables and builds the relation entry, runs only if a new publication was
> + * created.
> + */
> + if (!publications_valid && data->publications)
> + {
> + bool skipped_pub = false;
> + List    *publications;
> +
> + publications = LoadPublications(data->publication_names, &skipped_pub);
>
> The publications_valid flag indicates whether the publications cache
> is valid or not; the flag is set to false for any invalidation in the
> pg_publication catalog. I wonder that instead of using the same flag
> what if we use a separate publications_skipped flag? If that works,
> you don't even need to change the current location where we
> LoadPublications.

There is almost negligible dip with the above suggested way, the test
results for the same is given below(execution time is in milli
seconds):
Brach/records  |  100     |  1000   |  10000    |  100000 |  1000000
Head               |   10.25  |  15.85  |   65.53    |  569.15  |  9194.19
Patch              |   10.25  |  15.84  |   65.91    |  571.75  |  9208.66
% diff              |      0.00 |    0.06  |    -0.58    |     -0.46  |    -0.16

There is a performance dip in the range of 0 to 0.58 percent.
The attached patch has the changes for the same. The test script used
is also attached.

Regards,
Vignesh
From 8f2bca5afc03f3e34b2a0955415a702cbe6aef79 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Mon, 3 Mar 2025 11:54:27 +0530
Subject: [PATCH v5] Fix logical replication breakage after ALTER SUBSCRIPTION
 ... SET PUBLICATION

Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION`
could cause logical replication to break under certain conditions. When
the apply worker restarts after executing SET PUBLICATION, it continues
using the existing replication slot and origin. If the replication origin
was not updated before the restart, the WAL start location could point to
a position prior to the existence of the specified publication, leading to
persistent start of apply worker and reporting errors.

This patch skips loading the publication if the publication does not exist
and loads the publication and updates the relation entry when the publication
gets created.

Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 41 +++++++++++++++++----
 1 file changed, 33 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656aa..46793efe2b5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -81,8 +81,9 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
+static bool publications_updated;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -1762,9 +1763,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1772,9 +1777,15 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+		{
+			elog(DEBUG1, "skipped loading publication: %s", pubname);
+			*skipped = true;
+		}
 	}
 
 	return result;
@@ -1789,6 +1800,7 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
+	publications_updated = true;
 
 	/*
 	 * Also invalidate per-relation cache so that next time the filtering info
@@ -2053,8 +2065,12 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
-	/* Validate the entry */
-	if (!entry->replicate_valid)
+	/*
+	 * Validate the entry only if the entry is not valid or in case the
+	 * publications have been updated.
+	 */
+	if (!entry->replicate_valid ||
+		(!publications_valid && publications_updated))
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2071,16 +2087,25 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
 		{
+			publications_updated = false;
+
 			MemoryContextReset(data->pubctx);
 
 			oldctx = MemoryContextSwitchTo(data->pubctx);
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.43.0

#!/bin/bash

#####

SLOT_NAME=test
PLUGIN_NAME=pgoutput
NUM_RECORDS=100000
LOOP=10

#####

for i in `seq 1 $LOOP`
do
# Cleanup previous result

pg_ctl stop -D data
rm -rf data logfile

# Initialize an instance

initdb -D data -U postgres -c wal_level=logical

# Start the instance
pg_ctl -D data -l logfile start

# Create a table
psql -U postgres -c "CREATE TABLE foo (id int);"
psql -U postgres -c "CREATE publication pub1 for table foo;"
psql -U postgres -c "CREATE publication pub2 for table foo;"
#psql -U postgres -c "drop publication pub1;"

# Create a replication slot
psql -U postgres -c "SELECT * FROM pg_create_logical_replication_slot('$SLOT_NAME', '$PLUGIN_NAME')"

    # Insert tuples (this transaction will be decoded)
    psql -U postgres -c "INSERT INTO foo VALUES (generate_series(1, $NUM_RECORDS))"

    # Confirm current WAL location
    WAL_POS=$(psql -qtAX -U postgres -c "SELECT * FROM pg_current_wal_lsn()")


	t1=$(($(date +%s%N)/1000))
	echo $t1 > run_${i}.dat
	
	# Run pg_recvlogical till the current WAL location
	time pg_recvlogical -d postgres -U postgres --start -S $SLOT_NAME -E $WAL_POS -f - -o publication_names='pub1,pub2' -o proto_version=4 ) &>> run_${i}.dat
	
	t2=$(($(date +%s%N)/1000))
	echo $t2 >> run_${i}.dat

	t3=$((t2-t1))
	echo "execution time=$t3" >> run_${i}.dat
done

Reply via email to