Hi Jie,

> Most importantly, if we filter out unpublished relationship-related
> changes after
> constructing the changes but before queuing the changes into a transaction,
> will it reduce the workload of logical decoding and avoid disk or memory 
> growth
> as much as possible?

Thanks for the report!

Discarding the unused changes as soon as possible looks like a valid
optimization for me, but I pretty like more experienced people have a
double check. 

> Attached is the patch I used to implement this optimization.

After a quick look at the patch, I found FilterByTable is too expensive
because of the StartTransaction and AbortTransaction. With your above
setup and run the below test:

insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',
i),repeat('dfds', i) from generate_series(0,999100) i;

perf the wal sender of mypub for 30 seconds, then I get:

- 22.04%     1.53%  postgres  postgres           [.] FilterByTable              
                          - 20.51% FilterByTable                                
                                              
        AbortTransaction                                                        
                               ResourceOwnerReleaseInternal                     
                                                      LockReleaseAll            
                                                                             
hash_seq_search 

The main part comes from AbortTransaction, and the 20% is not trivial.

>From your patch:
+
+       /*
+        * Decoding needs access to syscaches et al., which in turn use
+        * heavyweight locks and such. Thus we need to have enough state around 
to
+        * keep track of those.  The easiest way is to simply use a transaction
+        * internally.
+ ....
+       using_subtxn = IsTransactionOrTransactionBlock();
+
+       if (using_subtxn)
+               BeginInternalSubTransaction("filter change by table");
+       else
+               StartTransactionCommand();

Acutally FilterByTable here is simpler than "decoding", we access
syscache only when we find an entry in get_rel_sync_entry and the
replicate_valid is false, and the invalid case should rare. 

What I'm thinking now is we allow the get_rel_sync_sync_entry build its
own transaction state *only when it find a invalid entry*. if the caller
has built it already, like the existing cases in master, nothing will
happen except a simple transaction state check. Then in the
FilterByTable case we just leave it for get_rel_sync_sync_entry. See the
attachemnt for the idea.

-- 
Best Regards
Andy Fan

>From 90b8df330df049bd1d7e881dc6e9b108c17b0924 Mon Sep 17 00:00:00 2001
From: "yizhi.fzh" <yizhi....@alibaba-inc.com>
Date: Wed, 21 Feb 2024 18:40:03 +0800
Subject: [PATCH v1 1/1] Make get_rel_sync_entry less depending on transaction
 state.

get_rel_sync_entry needs transaction only a replicate_valid = false
entry is found, this should be some rare case. However the caller can't
know if a entry is valid, so they have to prepare the transaction state
before calling this function. Such preparation is expensive.

This patch makes the get_rel_sync_entry can manage a transaction stage
only if necessary. so the callers don't need to prepare it blindly.
---
 src/backend/replication/pgoutput/pgoutput.c | 60 ++++++++++++++++++++-
 1 file changed, 59 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..25e55590a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/tupconvert.h"
+#include "access/relation.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
@@ -214,6 +215,11 @@ static void init_rel_sync_cache(MemoryContext cachectx);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 											 Relation relation);
+static RelationSyncEntry *get_rel_sync_entry_by_relid(PGOutputData *data,
+													  Oid relid);
+static RelationSyncEntry *get_rel_sync_entry_internal(PGOutputData *data,
+													  Relation relation,
+													  Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
@@ -1962,11 +1968,29 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Relation relation)
+{
+	return get_rel_sync_entry_internal(data, relation, InvalidOid);
+}
+
+static RelationSyncEntry *
+__attribute__ ((unused))
+get_rel_sync_entry_by_relid(PGOutputData *data, Oid relid)
+{
+	return get_rel_sync_entry_internal(data, NULL, relid);
+}
+
+static RelationSyncEntry *
+get_rel_sync_entry_internal(PGOutputData *data, Relation relation, Oid oid)
 {
 	RelationSyncEntry *entry;
 	bool		found;
 	MemoryContext oldctx;
-	Oid			relid = RelationGetRelid(relation);
+	Oid			relid = OidIsValid(oid) ? oid: RelationGetRelid(relation);
+	bool		started_xact = false, using_subtxn;
+
+	/* either oid or relation is provided. */
+	Assert(OidIsValid(oid) || RelationIsValid(relation));
+	Assert(!(OidIsValid(oid) && RelationIsValid(relation)));
 
 	Assert(RelationSyncCache != NULL);
 
@@ -1993,6 +2017,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
+	if (!entry->replicate_valid && !IsTransactionOrTransactionBlock())
+	{
+		/*
+		 * Validating the entry needs to access syscache, which must
+		 * be in a transaction state, if that's not ready, start one.
+		 */
+
+		using_subtxn = IsTransactionOrTransactionBlock();
+
+		if (using_subtxn)
+			BeginInternalSubTransaction(__func__);
+		else
+			StartTransactionCommand();
+
+		started_xact = true;
+	}
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
@@ -2198,9 +2239,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
 			entry->pubactions.pubdelete)
 		{
+			bool rel_opened = false;
+
+			if (!RelationIsValid(relation))
+			{
+				relation = relation_open(oid, AccessShareLock);
+				rel_opened = true;
+			}
 			/* Initialize the tuple slot and map */
 			init_tuple_slot(data, relation, entry);
 
+			if (rel_opened)
+				relation_close(relation, AccessShareLock);
+
 			/* Initialize the row filter */
 			pgoutput_row_filter_init(data, rel_publications, entry);
 
@@ -2215,6 +2266,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->replicate_valid = true;
 	}
 
+	if (started_xact)
+	{
+		AbortCurrentTransaction();
+		if (using_subtxn)
+			RollbackAndReleaseCurrentSubTransaction();
+	}
+
 	return entry;
 }
 
-- 
2.34.1

Reply via email to