Hi, here are more review comments for patch v20240720-0003. ====== src/backend/catalog/pg_subscription.c
(Numbers are starting at #4 because this is a continuation of the docs review) 4. GetSubscriptionRelations nitpick - rearranged the function header comment ~ 5. TBH, I'm thinking that just passing 2 parameters: - bool get_tables - bool get_sequences where one or both can be true, would have resulted in simpler code, instead of introducing this new enum SubscriptionRelKind. ~ 6. The 'not_all_relations' parameter/logic feels really awkward. IMO it needs a better name and reverse the meaning to remove all the "nots". For example, commenting it and calling it like below could be much simpler. 'all_relations' If returning sequences, if all_relations=true get all sequences, otherwise only get sequences that are in 'init' state. If returning tables, if all_relation=true get all tables, otherwise only get tables that have not reached 'READY' state. ====== src/backend/commands/subscriptioncmds.c AlterSubscription_refresh: nitpick - this function comment is difficult to understand. I've rearranged it a bit but it could still do with some further improvement. nitpick - move some code comments nitpick - I adjusted the "stop worker" comment slightly. Please check it is still correct. nitpick - add a blank line ~ 7. The logic seems over-complicated. For example, why is the sequence list *always* fetched, but the tables list is only sometimes fetched? Furthermore, this 'refresh_all_sequences' parameter seems to have a strange interference with tables (e.g. even though it is possible to refresh all tables and sequences at the same time). It is as if the meaning is 'refresh_publication_sequences' yet it is not called that (???) These gripes may be related to my other thread [1] about the new ALTER syntax. (I feel that there should be the ability to refresh ALL TABLES or ALL SEQUENCES independently if the user wants to). IIUC, it would simplify this function logic as well as being more flexible. Anyway, I will leave the discussion about syntax to that other thread. ~ 8. + if (relkind != RELKIND_SEQUENCE) + logicalrep_worker_stop(sub->oid, relid); /* * For READY state, we would have already dropped the * tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && relkind != RELKIND_SEQUENCE) It might be better to have a single "if (relkind != RELKIND_SEQUENCE)" here and combine both of these codes under that. ~ 9. ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + (errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name, + get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table"))); IIUC prior conDitions mean get_rel_relkind(relid) == RELKIND_SEQUENCE will be impossible here. ~~~ 10. AlterSubscription + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); IIUC the docs page for ALTER SUBSCRIPTION was missing this information about "REFRESH PUBLICATION SEQUENCES" in transactions. Docs need more updates. ====== src/backend/replication/logical/launcher.c logicalrep_worker_find: nitpick - tweak comment to say "or" instead of "and" ~~~ 11. +/* + * Return the pid of the apply worker for one that matches given + * subscription id. + */ +static LogicalRepWorker * +logicalrep_apply_worker_find(Oid subid, bool only_running) The function comment is wrong. This is not returning a PID. ~~~ 12. + if (is_sequencesync_worker) + Assert(!OidIsValid(relid)); Should we the Assert to something more like: Assert(!is_sequencesync_worker || !OidIsValid(relid)); Otherwise, in NODEBUG current code will compile into an empty condition statement, which is a bit odd. ~~~ logicalrep_seqsyncworker_failuretime: nitpick - tweak function comment nitpick - add blank line ====== .../replication/logical/sequencesync.c 13. fetch_remote_sequence_data The "current state" mentioned in the function comment is a bit vague. Can't tell from this comment what it is returning without looking deeper into the function code. ~ nitpick - typo "scenarios" in comment ~~~ copy_sequence: nitpick - typo "withe" in function comment nitpick - typo /retreived/retrieved/ nitpick - add/remove blank lines ~~~ LogicalRepSyncSequences: nitpick - move a comment. nitpick - remove blank line 14. + /* + * Verify whether the current batch of sequences is synchronized or if + * there are no remaining sequences to synchronize. + */ + if ((((curr_seq + 1) % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || + (curr_seq + 1) == seq_count) All this "curr_seq + 1" maths seems unnecessarily tricky. Can't we just increment the cur_seq? before this calculation? ~ nitpick - simplify the comment about batching nitpick - added a comment to the commit ====== src/backend/replication/logical/tablesync.c finish_sync_worker: nitpick - added an Assert so the if/else is less risky. nitpick - modify the comment about failure time when it is a clean exit ~~~ 15. process_syncing_sequences_for_apply + /* We need up-to-date sync state info for subscription sequences here. */ + FetchTableStates(&started_tx, SUB_REL_KIND_ALL); Should that say SUB_REL_KIND_SEQUENCE? ~ 16. + /* + * If there are free sync worker slot(s), start a new sequence + * sync worker, and break from the loop. + */ + if (nsyncworkers < max_sync_workers_per_subscription) Should this "if" have some "else" code to log a warning if we have run out of free workers? Otherwise, how will the user know that the system may need tuning? ~~~ 17. FetchTableStates /* Fetch all non-ready tables. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); + rstates = GetSubscriptionRelations(MySubscription->oid, rel_type, true); This feels risky. IMO there needs to be some prior Assert about the rel_type. For example, if it happened to be SUB_REL_KIND_SEQUENCE then this function code doesn't seem to make sense. ~~~ ====== src/backend/replication/logical/worker.c 18. SetupApplyOrSyncWorker + + if (isSequenceSyncWorker(MyLogicalRepWorker)) + before_shmem_exit(logicalrep_seqsyncworker_failuretime, (Datum) 0); Probably that should be using macro am_sequencesync_worker(), right? ====== src/include/catalog/pg_subscription_rel.h 19. +typedef enum +{ + SUB_REL_KIND_TABLE, + SUB_REL_KIND_SEQUENCE, + SUB_REL_KIND_ALL, +} SubscriptionRelKind; + I was not sure how helpful this is; it might not be needed. e.g. see review comment for GetSubscriptionRelations ~~~ 20. +extern List *GetSubscriptionRelations(Oid subid, SubscriptionRelKind reltype, + bool not_ready); There is a mismatch with the ‘not_ready’ parameter name here and in the function implementation ====== src/test/subscription/t/034_sequences.pl nitpick - removed a blank line ====== 99. Please also see the attached diffs patch which implements all the nitpicks mentioned above. ====== [1] syntax - https://www.postgresql.org/message-id/CAHut%2BPuFH1OCj-P1UKoRQE2X4-0zMG%2BN1V7jdn%3DtOQV4RNbAbw%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 5610c07..04d322a 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -494,14 +494,18 @@ HasSubscriptionRelations(Oid subid) /* * Get the relations for the subscription. * - * If rel_type is SUB_REL_KIND_SEQUENCE, get only the sequences. If rel_type is - * SUB_REL_KIND_TABLE, get only the tables. If rel_type is SUB_REL_KIND_ALL, - * get both tables and sequences. + * rel_type: + * If SUB_REL_KIND_SEQUENCE, return only the sequences. + * If SUB_REL_KIND_TABLE, return only the tables. + * If SUB_REL_KIND_ALL, return both tables and sequences. + * + * not_all_relations: * If not_all_relations is true for SUB_REL_KIND_TABLE and SUB_REL_KIND_ALL, * return only the relations that are not in a ready state, otherwise return all * the relations of the subscription. If not_all_relations is true for * SUB_REL_KIND_SEQUENCE, return only the sequences that are in init state, * otherwise return all the sequences of the subscription. + * * The returned list is palloc'ed in the current memory context. */ List * diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d23901a..2f9ff8b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -879,11 +879,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Update the subscription to refresh both the publication and the publication * objects associated with the subscription. * - * If the copy_data parameter is true, the function will set the state - * to "init"; otherwise, it will set the state to "ready". When the - * validate_publications is provided with a publication list, the function - * checks that the specified publications exist on the publisher. If - * refresh_all_sequences is true, it will mark all sequences with "init" state + * If 'copy_data' parameter is true, the function will set the state + * to "init"; otherwise, it will set the state to "ready". + * + * When 'validate_publications' is provided with a publication list, the function + * checks that the specified publications exist on the publisher. + * + * If 'refresh_all_sequences' is true, it will mark all sequences with "init" state * for re-synchronization; otherwise, only the newly added relations and * sequences will be updated based on the copy_data parameter. */ @@ -932,8 +934,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); + /* Get the table list from publisher. */ if (reltype == SUB_REL_KIND_ALL) - /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); /* Get the sequence list from publisher. */ @@ -1050,9 +1052,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub_remove_rels[remove_rel_len++].state = state; /* - * Since one sequence sync workers synchronizes all the - * sequences, stop the worker only if relation kind is not - * sequence. + * A single sequence-sync worker synchronizes all sequences, + * so only stop workers when relation kind is not sequence. */ if (relkind != RELKIND_SEQUENCE) logicalrep_worker_stop(sub->oid, relid); @@ -1088,6 +1089,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub->name, get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table"))); } + /* * In case of REFRESH PUBLICATION SEQUENCES, the existing sequences * should be re-synchronized. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 17759c2..86be218 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -237,7 +237,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given * subscription id and relid. * - * We are only interested in the leader apply worker, table sync worker and + * We are only interested in the leader apply worker, table sync worker, or * sequence sync worker. */ LogicalRepWorker * @@ -877,7 +877,7 @@ logicalrep_launcher_onexit(int code, Datum arg) } /* - * Update the failure time for the sequence sync worker in the subscription's + * Update the failure time of the sequence sync worker in the subscription's * apply worker. * * This function is invoked when the sequence sync worker exits due to a @@ -889,6 +889,7 @@ logicalrep_seqsyncworker_failuretime(int code, Datum arg) LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_apply_worker_find(MyLogicalRepWorker->subid, true); if (worker) worker->sequencesync_failure_time = GetCurrentTimestamp(); diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 7b1d071..45782c6 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -49,7 +49,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, XLogRecPtr *lsn) /* * In the event of crash we can lose (skip over) as many values as we - * pre-logged. We might get duplicate values in this kind of scenarios. So + * pre-logged. We might get duplicate values in this kind of scenario. So * use (last_value + log_cnt) to avoid it. */ appendStringInfo(&cmd, "SELECT (last_value + log_cnt), page_lsn " @@ -87,7 +87,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, XLogRecPtr *lsn) * Copy existing data of a sequence from publisher. * * Fetch the sequence value from the publisher and set the subscriber sequence - * withe the retreived value. Caller is responsible for locking the local + * withe the retrieved value. Caller is responsible for locking the local * relation. */ static XLogRecPtr @@ -115,9 +115,9 @@ copy_sequence(WalReceiverConn *conn, Relation rel) " AND c.relname = %s", quote_literal_cstr(nspname), quote_literal_cstr(relname)); + res = walrcv_exec(conn, cmd.data, lengthof(tableRow), tableRow); - if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -176,8 +176,9 @@ LogicalRepSyncSequences(void) */ #define MAX_SEQUENCES_SYNC_PER_BATCH 100 - /* Get the sequences that should be synchronized. */ StartTransactionCommand(); + + /* Get the sequences that should be synchronized. */ sequences = GetSubscriptionRelations(subid, SUB_REL_KIND_SEQUENCE, true); /* Allocate the tracking info in a permanent memory context. */ @@ -191,7 +192,6 @@ LogicalRepSyncSequences(void) } MemoryContextSwitchTo(oldctx); - CommitTransactionCommand(); /* Is the use of a password mandatory? */ @@ -272,8 +272,8 @@ LogicalRepSyncSequences(void) table_close(sequence_rel, NoLock); /* - * Verify whether the current batch of sequences is synchronized or if - * there are no remaining sequences to synchronize. + * Have we reached the end of the current batch of sequences, + * or last remaining sequences to synchronize? */ if ((((curr_seq + 1) % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || (curr_seq + 1) == seq_count) @@ -292,6 +292,7 @@ LogicalRepSyncSequences(void) get_subscription_name(subid, false), get_rel_name(done_seq->relid))); } + /* Commit this batch, and prepare for next batch. */ CommitTransactionCommand(); start_txn = true; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 313e5eb..9f77a78 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -143,6 +143,8 @@ void pg_attribute_noreturn() finish_sync_worker(LogicalRepWorkerType wtype) { + Assert(wtype == WORKERTYPE_TABLESYNC || wtype == WORKERTYPE_SEQUENCESYNC); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -171,7 +173,7 @@ finish_sync_worker(LogicalRepWorkerType wtype) /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* No need to set the sequence failure time when it is a clean exit */ + /* This is a clean exit, so no need to set a sequence failure time. */ if (wtype == WORKERTYPE_SEQUENCESYNC) cancel_before_shmem_exit(logicalrep_seqsyncworker_failuretime, 0); diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl index 8f4871d..ecc17f5 100644 --- a/src/test/subscription/t/034_sequences.pl +++ b/src/test/subscription/t/034_sequences.pl @@ -127,7 +127,6 @@ $result = $node_subscriber->safe_psql( 'postgres', qq( ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES )); - $node_subscriber->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for subscriber to synchronize data";