On Fri, 15 Jan 2021 at 15:49, japin <japi...@hotmail.com> wrote: > On Fri, 15 Jan 2021 at 14:50, Bharath Rupireddy > <bharath.rupireddyforpostg...@gmail.com> wrote: >> On Fri, Jan 15, 2021 at 11:33 AM Hou, Zhijie <houzj.f...@cn.fujitsu.com> >> wrote: >>> >>> > On Thu, Jan 14, 2021 at 5:36 PM Li Japin <japi...@hotmail.com> wrote >>> > > Do we really need to access PUBLICATIONRELMAP in this patch? What if >>> > > we just set it to false in the else condition of (if (publish && >>> > > (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))) >>> > > >>> > > Thank for you review. I agree with you, it doesn’t need to access >>> > > PUBLICATIONRELMAP, since We already get the publication oid in >>> > > GetRelationPublications(relid) [1], which also access >>> > > PUBLICATIONRELMAP. If the current pub->oid does not in pubids, the >>> > publish is false, so we do not need publish the table. >>> > >>> > +1. This is enough. >>> > >>> > > I have another question, the data->publications is a list, when did it >>> > has more then one items? >>> > >>> > IIUC, when the single table is associated with multiple publications, then >>> > data->publications will have multiple entries. Though I have not tried, >>> > we can try having two or three publications for the same table and verify >>> > that. >>> > >>> > > 0001 - change as you suggest. >>> > > 0002 - does not change. >>> >>> >>> Hi >>> >>> In 0001 patch >>> >>> The comments says " Relation is not associated with the publication anymore >>> " >>> That comment was accurate when check is_relation_part_of_publication() in >>> the last patch. >>> >>> But when put the code in the else branch, not only the case in the >>> comments(Relation is dropped), >>> Some other case such as "partitioned tables" will hit else branch too. >>> >>> Do you think we need fix the comment a little to make it accurate? >> >> Thanks, that comment needs a rework, in fact the else condition >> also(because we may fall into else branch even when publish is true >> and (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot) is >> false, but our intention(to fix the bug reported here) is to have the >> else condition only when publish is false. And also instead of just >> relying on the publish variable which can get set even when the >> relation is not in the publication but ancestor_published is true, we >> can just have something like below to fix the bug reported here: >> >> if (publish && >> (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) >> { >> entry->pubactions.pubinsert |= pub->pubactions.pubinsert; >> entry->pubactions.pubupdate |= pub->pubactions.pubupdate; >> entry->pubactions.pubdelete |= pub->pubactions.pubdelete; >> entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; >> } >> else if (!list_member_oid(pubids, pub->oid)) >> { >> /* >> * Relation is not associated with the publication anymore >> i.e >> * it would have been dropped from the publication. So we >> don't >> * need to publish the changes for it. >> */ >> entry->pubactions.pubinsert = false; >> entry->pubactions.pubupdate = false; >> entry->pubactions.pubdelete = false; >> entry->pubactions.pubtruncate = false; >> } >> >> So this way, the fix only focuses on what we have reported here and it >> doesn't cause any regressions may be, and the existing comment becomes >> appropriate. >> >> Thoughts? >> > > I'm sorry, the 0001 patch is totally wrong. If we have only one publication, > it works, > however, this is a special case. If we have multiple publication in one > subscription, > it doesn't work. Here is a usecase. > > Step (1) > -- On publisher > CREATE TABLE t1 (a int); > CREATE TABLE t2 (a int); > CREATE PUBLICATION mypub1 FOR TABLE t1 WITH (publish='insert'); > CREATE PUBLICATION mypub2 FOR TABLE t2; > > Step (2) > -- On subscriber > CREATE TABLE t1 (a int); > CREATE TABLE t2 (a int); > CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 > dbname=postgres' PUBLICATION mypub1, mypub2; > > Step (3) > -- On publisher > INSERT INTO t1 VALUES (1); > > Step (4) > -- On subscriber > SELECT * FROM t1; > > In step (4), we cannot get the records, because there has two publications in > data->publications, so we will check one by one. > The first publication is "mypub1" and entry->pubactions.pubinsert will be set > true, however, in the second round, the publication is "mypub2", and we cannot > find pub->oid in pubids (only oid of "mypub1"), so it will set all > entry->pubactions > to false, and nothing will be replicate to subscriber. > > My apologies!
As I mentioned in previous thread, if there has multiple publications in single subscription, it might lead data loss. When the entry got invalidated in rel_sync_cache_publication_cb(), I think we should mark the pubactions to false, and let get_rel_sync_entry() recalculate the pubactions. 0001 - modify as above described. 0002 - do not change. Any thought? -- Regrads, Japin Li. ChengDu WenWu Information Technology Co.,Ltd.
>From e66d42b6bcd2c31e2ce25035ff9334db2d66dd5f Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupire...@enterprisedb.com> Date: Wed, 13 Jan 2021 16:35:00 +0530 Subject: [PATCH v3 1/2] Fix ALTER PUBLICATION...DROP TABLE behaviour Currently, in logical replication, publisher/walsender publishes the tables even though they aren't part of the publication i.e they are dropped from the publication. Because of this ALTER PUBLICATION...DROP TABLE doesn't work as expected. --- src/backend/replication/pgoutput/pgoutput.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 2f01137b42..118faf6ce3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1179,5 +1179,18 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) */ hash_seq_init(&status, RelationSyncCache); while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) + { entry->replicate_valid = false; + + /* + * There might some relations dropped from the publication, we do + * not need to publish the changes for them. However, we cannot get + * the dropped relations (see above), so we reset pubactions for all + * entries. + */ + entry->pubactions.pubinsert = false; + entry->pubactions.pubupdate = false; + entry->pubactions.pubdelete = false; + entry->pubactions.pubtruncate = false; + } } -- 2.30.0
>From 63dbb0ea405241346f8ad19d5cbaebbc22473a1e Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupire...@enterprisedb.com> Date: Wed, 13 Jan 2021 17:23:08 +0530 Subject: [PATCH v3 2/2] Invalidate relation map cache in subscriber syscache invalidation callbacks Currently, in logical replication, relation map cache in the subscriber is not getting invalidated when anything changes in pg_subscription_rel or pg_subscription catalogues. So we end up not reading the latest system catalogues always in logicalrep_rel_open using GetSubscriptionRelState. To fix this, invalidate the relation map cache entries in invalidate_syncing_table_states and subscription_change_cb which are invalidation callbacks for pg_subscription_rel and pg_subscription catalogues respectively. --- src/backend/replication/logical/relation.c | 26 +++++++++++++++++++++ src/backend/replication/logical/tablesync.c | 3 +++ src/backend/replication/logical/worker.c | 3 +++ src/include/replication/logicalrelation.h | 1 + 4 files changed, 33 insertions(+) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index e861c0ff80..88ac772444 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -96,6 +96,32 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Invalidate relation map cache whenever syscache of pg_subscription_rel or + * pg_subscription gets changed. + */ +void +logicalrep_relmap_invalidate(void) +{ + LogicalRepRelMapEntry *entry; + HASH_SEQ_STATUS status; + + /* Just to be sure. */ + if (LogicalRepRelMap == NULL) + return; + + /* + * There is no way to find the cache entry for which the syscache has been + * changed, so we mark all the cache entries state as unknown. Because of + * this, in logicalrep_rel_open the cache entry state will be read from + * the system catalogue using GetSubscriptionRelState. + */ + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + entry->state = SUBREL_STATE_UNKNOWN; +} + /* * Initialize the relation map cache. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 863d196fd7..6b29173a5c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -257,6 +257,9 @@ void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) { table_states_valid = false; + + /* Invalidate relation map cache. */ + logicalrep_relmap_invalidate(); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f2b2549a51..347264e4a8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2487,6 +2487,9 @@ static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) { MySubscriptionValid = false; + + /* Invalidate relation map cache. */ + logicalrep_relmap_invalidate(); } /* diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 3f0b3deefb..f5e70b75b0 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -49,4 +49,5 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +extern void logicalrep_relmap_invalidate(void); #endif /* LOGICALRELATION_H */ -- 2.30.0