On Thu, 8 Aug 2024 at 16:24, Shlok Kyal <shlok.kyal....@gmail.com> wrote: > > On Wed, 31 Jul 2024 at 11:17, Shlok Kyal <shlok.kyal....@gmail.com> wrote: > > > > On Wed, 31 Jul 2024 at 09:36, Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > On Wed, Jul 31, 2024 at 3:27 AM Masahiko Sawada <sawada.m...@gmail.com> > > > wrote: > > > > > > > > On Wed, Jul 24, 2024 at 9:53 PM Amit Kapila <amit.kapil...@gmail.com> > > > > wrote: > > > > > > > > > > On Wed, Jul 17, 2024 at 5:25 PM vignesh C <vignes...@gmail.com> wrote: > > > > > > > > > > > > On Wed, 17 Jul 2024 at 11:54, Amit Kapila <amit.kapil...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > On Tue, Jul 16, 2024 at 6:54 PM vignesh C <vignes...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > BTW, I noticed that we don't take any table-level locks for Create > > > > > > > Publication .. For ALL TABLES (and Drop Publication). Can that > > > > > > > create > > > > > > > a similar problem? I haven't tested so not sure but even if there > > > > > > > is a > > > > > > > problem for the Create case, it should lead to some ERROR like > > > > > > > missing > > > > > > > publication. > > > > > > > > > > > > I tested these scenarios, and as you expected, it throws an error > > > > > > for > > > > > > the create publication case: > > > > > > 2024-07-17 14:50:01.145 IST [481526] 481526 ERROR: could not > > > > > > receive > > > > > > data from WAL stream: ERROR: publication "pub1" does not exist > > > > > > CONTEXT: slot "sub1", output plugin "pgoutput", in the > > > > > > change > > > > > > callback, associated LSN 0/1510CD8 > > > > > > 2024-07-17 14:50:01.147 IST [481450] 481450 LOG: background worker > > > > > > "logical replication apply worker" (PID 481526) exited with exit > > > > > > code > > > > > > 1 > > > > > > > > > > > > The steps for this process are as follows: > > > > > > 1) Create tables in both the publisher and subscriber. > > > > > > 2) On the publisher: Create a replication slot. > > > > > > 3) On the subscriber: Create a subscription using the slot created > > > > > > by > > > > > > the publisher. > > > > > > 4) On the publisher: > > > > > > 4.a) Session 1: BEGIN; INSERT INTO T1; > > > > > > 4.b) Session 2: CREATE PUBLICATION FOR ALL TABLES > > > > > > 4.c) Session 1: COMMIT; > > > > > > > > > > > > Since we are throwing out a "publication does not exist" error, > > > > > > there > > > > > > is no inconsistency issue here. > > > > > > > > > > > > However, an issue persists with DROP ALL TABLES publication, where > > > > > > data continues to replicate even after the publication is dropped. > > > > > > This happens because the open transaction consumes the invalidation, > > > > > > causing the publications to be revalidated using old snapshot. As a > > > > > > result, both the open transactions and the subsequent transactions > > > > > > are > > > > > > getting replicated. > > > > > > > > > > > > We can reproduce this issue by following these steps in a logical > > > > > > replication setup with an "ALL TABLES" publication: > > > > > > On the publisher: > > > > > > Session 1: BEGIN; INSERT INTO T1 VALUES (val1); > > > > > > In another session on the publisher: > > > > > > Session 2: DROP PUBLICATION > > > > > > Back in Session 1 on the publisher: > > > > > > COMMIT; > > > > > > Finally, in Session 1 on the publisher: > > > > > > INSERT INTO T1 VALUES (val2); > > > > > > > > > > > > Even after dropping the publication, both val1 and val2 are still > > > > > > being replicated to the subscriber. This means that both the > > > > > > in-progress concurrent transaction and the subsequent transactions > > > > > > are > > > > > > being replicated. > > > > > > > > > > > > I don't think locking all tables is a viable solution in this case, > > > > > > as > > > > > > it would require asking the user to refrain from performing any > > > > > > operations on any of the tables in the database while creating a > > > > > > publication. > > > > > > > > > > > > > > > > Indeed, locking all tables in the database to prevent concurrent DMLs > > > > > for this scenario also looks odd to me. The other alternative > > > > > previously suggested by Andres is to distribute catalog modifying > > > > > transactions to all concurrent in-progress transactions [1] but as > > > > > mentioned this could add an overhead. One possibility to reduce > > > > > overhead is that we selectively distribute invalidations for > > > > > catalogs-related publications but I haven't analyzed the feasibility. > > > > > > > > > > We need more opinions to decide here, so let me summarize the problem > > > > > and solutions discussed. As explained with an example in an email [1], > > > > > the problem related to logical decoding is that it doesn't process > > > > > invalidations corresponding to DDLs for the already in-progress > > > > > transactions. We discussed preventing DMLs in the first place when > > > > > concurrent DDLs like ALTER PUBLICATION ... ADD TABLE ... are in > > > > > progress. The solution discussed was to acquire > > > > > ShareUpdateExclusiveLock for all the tables being added via such > > > > > commands. Further analysis revealed that the same handling is required > > > > > for ALTER PUBLICATION ... ADD TABLES IN SCHEMA which means locking all > > > > > the tables in the specified schemas. Then DROP PUBLICATION also seems > > > > > to have similar symptoms which means in the worst case (where > > > > > publication is for ALL TABLES) we have to lock all the tables in the > > > > > database. We are not sure if that is good so the other alternative we > > > > > can pursue is to distribute invalidations in logical decoding > > > > > infrastructure [1] which has its downsides. > > > > > > > > > > Thoughts? > > > > > > > > Thank you for summarizing the problem and solutions! > > > > > > > > I think it's worth trying the idea of distributing invalidation > > > > messages, and we will see if there could be overheads or any further > > > > obstacles. IIUC this approach would resolve another issue we discussed > > > > before too[1]. > > > > > > > > > > Yes, and we also discussed having a similar solution at the time when > > > that problem was reported. So, it is clear that even though locking > > > tables can work for commands alter ALTER PUBLICATION ... ADD TABLE > > > ..., we need a solution for distributing invalidations to the > > > in-progress transactions during logical decoding for other cases as > > > reported by you previously. > > > > > > Thanks for looking into this. > > > > > > > Thanks, I am working on to implement a solution for distributing > > invalidations. Will share a patch for the same. > > Created a patch for distributing invalidations. > Here we collect the invalidation messages for the current transaction > and distribute it to all the inprogress transactions, whenever we are > distributing the snapshots..Thoughts?
Since we are applying invalidations to all in-progress transactions, the publisher will only replicate half of the transaction data up to the point of invalidation, while the remaining half will not be replicated. Ex: Session1: BEGIN; INSERT INTO tab_conc VALUES (1); Session2: ALTER PUBLICATION regress_pub1 DROP TABLE tab_conc; Session1: INSERT INTO tab_conc VALUES (2); INSERT INTO tab_conc VALUES (3); COMMIT; After the above the subscriber data looks like: postgres=# select * from tab_conc ; a --- 1 (1 row) You can reproduce the issue using the attached test. I'm not sure if this behavior is ok. At present, we’ve replicated the first record within the same transaction, but the second and third records are being skipped. Would it be better to apply invalidations after the transaction is underway? Thoughts? Regards, Vignesh
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index cb36ca7b16..b5d9749bdf 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -487,6 +487,145 @@ $result = is( $result, qq(2|f 3|t), 'check replicated update on subscriber'); +# cleanpup +$node_publisher->safe_psql('postgres', qq(DROP PUBLICATION pub1;)); +$node_subscriber->safe_psql('postgres', qq(DROP SUBSCRIPTION sub1;)); + +# ============================================================================= +# The bug was that the incremental data synchronization was being skipped when +# a new table is added to the publication in presence of a concurrent active +# transaction performing the DML on the same table. +# ============================================================================= + +# Initial setup. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE tab_conc(a int); + CREATE PUBLICATION regress_pub1; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE tab_conc(a int); + CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1; + +)); + +# Bump the query timeout to avoid false negatives on slow test systems. +my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default; + +# Initiate a background session that keeps a transaction active. +my $background_psql1 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); + +# Maintain an active transaction with the table. +$background_psql1->set_query_timer_restart(); +$background_psql1->query_safe( + qq[ + BEGIN; + INSERT INTO tab_conc VALUES (1); +]); + +# Add the table to the publication using background_psql, as the alter +# publication operation will wait for the lock and can only be completed after +# the previous open transaction is committed. +my $background_psql2 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); + +$background_psql2->set_query_timer_restart(); + +# This operation will wait because there is an open transaction holding a lock. +$background_psql2->query_until(qr//, + "ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc;\n"); + +# Complete the old transaction. +$background_psql1->query_safe(qq[COMMIT]); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (2); +)); + +# Refresh the publication. +$node_subscriber->safe_psql('postgres', + 'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION'); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2), + 'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed' +); + +# Perform an insert. +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (3); +)); +$node_publisher->wait_for_catchup('regress_sub1'); + +# Verify that the insert is replicated to the subscriber. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2 +3), + 'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber' +); + +# ============================================================================= +# This bug is present with ALTER PUBLICATION ... DROP TABLE. +# ============================================================================= +$background_psql1->query_safe( + qq[ + BEGIN; + INSERT INTO tab_conc VALUES (4); +]); + +# wait for WAL to be generated +sleep(1); + +# This operation will wait because there is an open transaction holding a lock. +$background_psql2->query_until(qr//, + "ALTER PUBLICATION regress_pub1 DROP TABLE tab_conc;\n"); + +# wait for WAL to be generated +sleep(1); + +# Complete the old transaction. +$background_psql1->query_safe( + qq[ + INSERT INTO tab_conc VALUES (5); + INSERT INTO tab_conc VALUES (6); + COMMIT; +]); +#$background_psql1->query_safe(qq[COMMIT]); +$background_psql1->quit; + +# Wait till the tables are dropped from the publication. +$node_publisher->poll_query_until('postgres', + "SELECT COUNT(1) = 0 FROM pg_publication_rel WHERE prrelid IN ('tab_conc'::regclass);" + ) + or die + "Timed out while waiting for alter publication to add the table to the publication"; + +$node_publisher->wait_for_catchup('regress_sub1'); + +# Verify that the insert before drop table is replicated to the subscriber. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2 +3 +4 +5 +6), + 'Verify that the incremental data for table tab_conc before removing table from publication is replicated to the subscriber' +); + $node_publisher->stop('fast'); $node_subscriber->stop('fast');