Hi Jie, > Most importantly, if we filter out unpublished relationship-related > changes after > constructing the changes but before queuing the changes into a transaction, > will it reduce the workload of logical decoding and avoid disk or memory > growth > as much as possible?
Thanks for the report! Discarding the unused changes as soon as possible looks like a valid optimization for me, but I pretty like more experienced people have a double check. > Attached is the patch I used to implement this optimization. After a quick look at the patch, I found FilterByTable is too expensive because of the StartTransaction and AbortTransaction. With your above setup and run the below test: insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba', i),repeat('dfds', i) from generate_series(0,999100) i; perf the wal sender of mypub for 30 seconds, then I get: - 22.04% 1.53% postgres postgres [.] FilterByTable - 20.51% FilterByTable AbortTransaction ResourceOwnerReleaseInternal LockReleaseAll hash_seq_search The main part comes from AbortTransaction, and the 20% is not trivial. >From your patch: + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. + .... + using_subtxn = IsTransactionOrTransactionBlock(); + + if (using_subtxn) + BeginInternalSubTransaction("filter change by table"); + else + StartTransactionCommand(); Acutally FilterByTable here is simpler than "decoding", we access syscache only when we find an entry in get_rel_sync_entry and the replicate_valid is false, and the invalid case should rare. What I'm thinking now is we allow the get_rel_sync_sync_entry build its own transaction state *only when it find a invalid entry*. if the caller has built it already, like the existing cases in master, nothing will happen except a simple transaction state check. Then in the FilterByTable case we just leave it for get_rel_sync_sync_entry. See the attachemnt for the idea. -- Best Regards Andy Fan
>From 90b8df330df049bd1d7e881dc6e9b108c17b0924 Mon Sep 17 00:00:00 2001 From: "yizhi.fzh" <yizhi....@alibaba-inc.com> Date: Wed, 21 Feb 2024 18:40:03 +0800 Subject: [PATCH v1 1/1] Make get_rel_sync_entry less depending on transaction state. get_rel_sync_entry needs transaction only a replicate_valid = false entry is found, this should be some rare case. However the caller can't know if a entry is valid, so they have to prepare the transaction state before calling this function. Such preparation is expensive. This patch makes the get_rel_sync_entry can manage a transaction stage only if necessary. so the callers don't need to prepare it blindly. --- src/backend/replication/pgoutput/pgoutput.c | 60 ++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 998f92d671..25e55590a2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/tupconvert.h" +#include "access/relation.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" @@ -214,6 +215,11 @@ static void init_rel_sync_cache(MemoryContext cachectx); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation); +static RelationSyncEntry *get_rel_sync_entry_by_relid(PGOutputData *data, + Oid relid); +static RelationSyncEntry *get_rel_sync_entry_internal(PGOutputData *data, + Relation relation, + Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -1962,11 +1968,29 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation) +{ + return get_rel_sync_entry_internal(data, relation, InvalidOid); +} + +static RelationSyncEntry * +__attribute__ ((unused)) +get_rel_sync_entry_by_relid(PGOutputData *data, Oid relid) +{ + return get_rel_sync_entry_internal(data, NULL, relid); +} + +static RelationSyncEntry * +get_rel_sync_entry_internal(PGOutputData *data, Relation relation, Oid oid) { RelationSyncEntry *entry; bool found; MemoryContext oldctx; - Oid relid = RelationGetRelid(relation); + Oid relid = OidIsValid(oid) ? oid: RelationGetRelid(relation); + bool started_xact = false, using_subtxn; + + /* either oid or relation is provided. */ + Assert(OidIsValid(oid) || RelationIsValid(relation)); + Assert(!(OidIsValid(oid) && RelationIsValid(relation))); Assert(RelationSyncCache != NULL); @@ -1993,6 +2017,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->attrmap = NULL; } + if (!entry->replicate_valid && !IsTransactionOrTransactionBlock()) + { + /* + * Validating the entry needs to access syscache, which must + * be in a transaction state, if that's not ready, start one. + */ + + using_subtxn = IsTransactionOrTransactionBlock(); + + if (using_subtxn) + BeginInternalSubTransaction(__func__); + else + StartTransactionCommand(); + + started_xact = true; + } + /* Validate the entry */ if (!entry->replicate_valid) { @@ -2198,9 +2239,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (entry->pubactions.pubinsert || entry->pubactions.pubupdate || entry->pubactions.pubdelete) { + bool rel_opened = false; + + if (!RelationIsValid(relation)) + { + relation = relation_open(oid, AccessShareLock); + rel_opened = true; + } /* Initialize the tuple slot and map */ init_tuple_slot(data, relation, entry); + if (rel_opened) + relation_close(relation, AccessShareLock); + /* Initialize the row filter */ pgoutput_row_filter_init(data, rel_publications, entry); @@ -2215,6 +2266,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->replicate_valid = true; } + if (started_xact) + { + AbortCurrentTransaction(); + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + } + return entry; } -- 2.34.1