On Mon, 3 Mar 2025 at 16:41, Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Mon, Mar 3, 2025 at 2:30 PM vignesh C <vignes...@gmail.com> wrote: > > > > On Tue, 25 Feb 2025 at 15:32, vignesh C <vignes...@gmail.com> wrote: > > > > > > The attached script has the script that was used for testing. Here the > > > NUM_RECORDS count should be changed accordingly for each of the tests > > > and while running the test with the patch change uncomment the drop > > > publication command. > > > > I have done further analysis on the test and changed the test to > > compare it better with HEAD. The execution time is in milliseconds. > > Brach/records | 100 | 1000 | 10000 | 100000 | 1000000 > > Head | 10.43 | 15.86 | 64.44 | 550.56 | 8991.04 > > Patch | 11.35 | 17.26 | 73.50 | 640.21 | 10104.72 > > % diff | -8.82 | -8.85 | -14.08 | -16.28 | > > -12.38 > > > > There is a performance degradation in the range of 8.8 to 16.2 percent. > > > > - /* Validate the entry */ > - if (!entry->replicate_valid) > + /* > + * If the publication is invalid, check for updates. > + * This optimization ensures that the next block, which queries the system > + * tables and builds the relation entry, runs only if a new publication was > + * created. > + */ > + if (!publications_valid && data->publications) > + { > + bool skipped_pub = false; > + List *publications; > + > + publications = LoadPublications(data->publication_names, &skipped_pub); > > The publications_valid flag indicates whether the publications cache > is valid or not; the flag is set to false for any invalidation in the > pg_publication catalog. I wonder that instead of using the same flag > what if we use a separate publications_skipped flag? If that works, > you don't even need to change the current location where we > LoadPublications.
There is almost negligible dip with the above suggested way, the test results for the same is given below(execution time is in milli seconds): Brach/records | 100 | 1000 | 10000 | 100000 | 1000000 Head | 10.25 | 15.85 | 65.53 | 569.15 | 9194.19 Patch | 10.25 | 15.84 | 65.91 | 571.75 | 9208.66 % diff | 0.00 | 0.06 | -0.58 | -0.46 | -0.16 There is a performance dip in the range of 0 to 0.58 percent. The attached patch has the changes for the same. The test script used is also attached. Regards, Vignesh
From 8f2bca5afc03f3e34b2a0955415a702cbe6aef79 Mon Sep 17 00:00:00 2001 From: Vignesh <vignesh21@gmail.com> Date: Mon, 3 Mar 2025 11:54:27 +0530 Subject: [PATCH v5] Fix logical replication breakage after ALTER SUBSCRIPTION ... SET PUBLICATION Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION` could cause logical replication to break under certain conditions. When the apply worker restarts after executing SET PUBLICATION, it continues using the existing replication slot and origin. If the replication origin was not updated before the restart, the WAL start location could point to a position prior to the existence of the specified publication, leading to persistent start of apply worker and reporting errors. This patch skips loading the publication if the publication does not exist and loads the publication and updates the relation entry when the publication gets created. Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 41 +++++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7d464f656aa..46793efe2b5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -81,8 +81,9 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; +static bool publications_updated; -static List *LoadPublications(List *pubnames); +static List *LoadPublications(List *pubnames, bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_repl_origin(LogicalDecodingContext *ctx, @@ -1762,9 +1763,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. 'skipped' + * will be true if we find any publication from the given list that doesn't + * exist. */ static List * -LoadPublications(List *pubnames) +LoadPublications(List *pubnames, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1772,9 +1777,15 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + { + elog(DEBUG1, "skipped loading publication: %s", pubname); + *skipped = true; + } } return result; @@ -1789,6 +1800,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) { publications_valid = false; + publications_updated = true; /* * Also invalidate per-relation cache so that next time the filtering info @@ -2053,8 +2065,12 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->attrmap = NULL; } - /* Validate the entry */ - if (!entry->replicate_valid) + /* + * Validate the entry only if the entry is not valid or in case the + * publications have been updated. + */ + if (!entry->replicate_valid || + (!publications_valid && publications_updated)) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); @@ -2071,16 +2087,25 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool skipped_pub = false; /* Reload publications if needed before use. */ if (!publications_valid) { + publications_updated = false; + MemoryContextReset(data->pubctx); oldctx = MemoryContextSwitchTo(data->pubctx); - data->publications = LoadPublications(data->publication_names); + data->publications = LoadPublications(data->publication_names, &skipped_pub); MemoryContextSwitchTo(oldctx); - publications_valid = true; + + /* + * We don't consider the publications to be valid till we have + * information of all the publications. + */ + if (!skipped_pub) + publications_valid = true; } /* -- 2.43.0
#!/bin/bash ##### SLOT_NAME=test PLUGIN_NAME=pgoutput NUM_RECORDS=100000 LOOP=10 ##### for i in `seq 1 $LOOP` do # Cleanup previous result pg_ctl stop -D data rm -rf data logfile # Initialize an instance initdb -D data -U postgres -c wal_level=logical # Start the instance pg_ctl -D data -l logfile start # Create a table psql -U postgres -c "CREATE TABLE foo (id int);" psql -U postgres -c "CREATE publication pub1 for table foo;" psql -U postgres -c "CREATE publication pub2 for table foo;" #psql -U postgres -c "drop publication pub1;" # Create a replication slot psql -U postgres -c "SELECT * FROM pg_create_logical_replication_slot('$SLOT_NAME', '$PLUGIN_NAME')" # Insert tuples (this transaction will be decoded) psql -U postgres -c "INSERT INTO foo VALUES (generate_series(1, $NUM_RECORDS))" # Confirm current WAL location WAL_POS=$(psql -qtAX -U postgres -c "SELECT * FROM pg_current_wal_lsn()") t1=$(($(date +%s%N)/1000)) echo $t1 > run_${i}.dat # Run pg_recvlogical till the current WAL location time pg_recvlogical -d postgres -U postgres --start -S $SLOT_NAME -E $WAL_POS -f - -o publication_names='pub1,pub2' -o proto_version=4 ) &>> run_${i}.dat t2=$(($(date +%s%N)/1000)) echo $t2 >> run_${i}.dat t3=$((t2-t1)) echo "execution time=$t3" >> run_${i}.dat done