Hi Jie,
> Most importantly, if we filter out unpublished relationship-related
> changes after
> constructing the changes but before queuing the changes into a transaction,
> will it reduce the workload of logical decoding and avoid disk or memory
> growth
> as much as possible?
Thanks for the report!
Discarding the unused changes as soon as possible looks like a valid
optimization for me, but I pretty like more experienced people have a
double check.
> Attached is the patch I used to implement this optimization.
After a quick look at the patch, I found FilterByTable is too expensive
because of the StartTransaction and AbortTransaction. With your above
setup and run the below test:
insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',
i),repeat('dfds', i) from generate_series(0,999100) i;
perf the wal sender of mypub for 30 seconds, then I get:
- 22.04% 1.53% postgres postgres [.] FilterByTable
- 20.51% FilterByTable
AbortTransaction
ResourceOwnerReleaseInternal
LockReleaseAll
hash_seq_search
The main part comes from AbortTransaction, and the 20% is not trivial.
>From your patch:
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around
to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally.
+ ....
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("filter change by table");
+ else
+ StartTransactionCommand();
Acutally FilterByTable here is simpler than "decoding", we access
syscache only when we find an entry in get_rel_sync_entry and the
replicate_valid is false, and the invalid case should rare.
What I'm thinking now is we allow the get_rel_sync_sync_entry build its
own transaction state *only when it find a invalid entry*. if the caller
has built it already, like the existing cases in master, nothing will
happen except a simple transaction state check. Then in the
FilterByTable case we just leave it for get_rel_sync_sync_entry. See the
attachemnt for the idea.
--
Best Regards
Andy Fan
>From 90b8df330df049bd1d7e881dc6e9b108c17b0924 Mon Sep 17 00:00:00 2001
From: "yizhi.fzh" <[email protected]>
Date: Wed, 21 Feb 2024 18:40:03 +0800
Subject: [PATCH v1 1/1] Make get_rel_sync_entry less depending on transaction
state.
get_rel_sync_entry needs transaction only a replicate_valid = false
entry is found, this should be some rare case. However the caller can't
know if a entry is valid, so they have to prepare the transaction state
before calling this function. Such preparation is expensive.
This patch makes the get_rel_sync_entry can manage a transaction stage
only if necessary. so the callers don't need to prepare it blindly.
---
src/backend/replication/pgoutput/pgoutput.c | 60 ++++++++++++++++++++-
1 file changed, 59 insertions(+), 1 deletion(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..25e55590a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/tupconvert.h"
+#include "access/relation.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
@@ -214,6 +215,11 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static RelationSyncEntry *get_rel_sync_entry_by_relid(PGOutputData *data,
+ Oid relid);
+static RelationSyncEntry *get_rel_sync_entry_internal(PGOutputData *data,
+ Relation relation,
+ Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -1962,11 +1968,29 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Relation relation)
+{
+ return get_rel_sync_entry_internal(data, relation, InvalidOid);
+}
+
+static RelationSyncEntry *
+__attribute__ ((unused))
+get_rel_sync_entry_by_relid(PGOutputData *data, Oid relid)
+{
+ return get_rel_sync_entry_internal(data, NULL, relid);
+}
+
+static RelationSyncEntry *
+get_rel_sync_entry_internal(PGOutputData *data, Relation relation, Oid oid)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
- Oid relid = RelationGetRelid(relation);
+ Oid relid = OidIsValid(oid) ? oid: RelationGetRelid(relation);
+ bool started_xact = false, using_subtxn;
+
+ /* either oid or relation is provided. */
+ Assert(OidIsValid(oid) || RelationIsValid(relation));
+ Assert(!(OidIsValid(oid) && RelationIsValid(relation)));
Assert(RelationSyncCache != NULL);
@@ -1993,6 +2017,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->attrmap = NULL;
}
+ if (!entry->replicate_valid && !IsTransactionOrTransactionBlock())
+ {
+ /*
+ * Validating the entry needs to access syscache, which must
+ * be in a transaction state, if that's not ready, start one.
+ */
+
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction(__func__);
+ else
+ StartTransactionCommand();
+
+ started_xact = true;
+ }
+
/* Validate the entry */
if (!entry->replicate_valid)
{
@@ -2198,9 +2239,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
entry->pubactions.pubdelete)
{
+ bool rel_opened = false;
+
+ if (!RelationIsValid(relation))
+ {
+ relation = relation_open(oid, AccessShareLock);
+ rel_opened = true;
+ }
/* Initialize the tuple slot and map */
init_tuple_slot(data, relation, entry);
+ if (rel_opened)
+ relation_close(relation, AccessShareLock);
+
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
@@ -2215,6 +2266,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->replicate_valid = true;
}
+ if (started_xact)
+ {
+ AbortCurrentTransaction();
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+
return entry;
}
--
2.34.1