On Wednesday, August 10, 2022 5:40 PM Peter Smith <smithpb2...@gmail.com> wrote: > > Here are some review comments for the patch v20-0001: > ====== > > 1. doc/src/sgml/catalogs.sgml > > + <literal>p</literal> = apply changes directly using a background > + worker, if available, otherwise, it behaves the same as 't' > > The different char values 'f','t','p' are separated by comma (,) in > the list, which is normal for the pgdocs AFAIK. However, because of > this I don't think it is a good idea to use those other commas within > the description for 'p', I suggest you remove those ones to avoid > ambiguity with the separators.
Changed. > ====== > > 2. doc/src/sgml/protocol.sgml > > @@ -3096,7 +3096,7 @@ psql "dbname=postgres replication=database" -c > "IDENTIFY_SYSTEM;" > <listitem> > <para> > Protocol version. Currently versions <literal>1</literal>, > <literal>2</literal>, > - and <literal>3</literal> are supported. > + <literal>3</literal> and <literal>4</literal> are supported. > </para> > > Put a comma after the penultimate value like it had before. > Changed. > ====== > > 3. src/backend/replication/logical/applybgworker.c - <general> > > There are multiple function comments and other code comments in this > file that are missing a terminating period (.) > > ====== > Changed. > 4. src/backend/replication/logical/applybgworker.c - apply_bgworker_start > > +/* > + * Try to get a free apply background worker. > + * > + * If there is at least one worker in the free list, then take one. > Otherwise, > + * try to start a new apply background worker. If successful, cache it in > + * ApplyBgworkersHash keyed by the specified xid. > + */ > +ApplyBgworkerState * > +apply_bgworker_start(TransactionId xid) > > SUGGESTION (for function comment) > Return the apply background worker that will be used for the specified xid. > > If an apply background worker is found in the free list then re-use > it, otherwise start a fresh one. Cache the worker ApplyBgworkersHash > keyed by the specified xid. > > ~~~ > Changed. > 5. > > + /* Try to get a free apply background worker */ > + if (list_length(ApplyBgworkersFreeList) > 0) > > if (list_length(ApplyBgworkersFreeList) > 0) > > AFAIK a non-empty list is guaranteed to be not NIL, and an empty list > is guaranteed to be NIL. So if you want to the above can simply be > written as: > > if (ApplyBgworkersFreeList) > Both ways are fine to me, so I kept the current style. > ~~~ > > 6. src/backend/replication/logical/applybgworker.c - apply_bgworker_find > > +/* > + * Try to look up worker assigned before (see function > apply_bgworker_get_free) > + * inside ApplyBgworkersHash for requested xid. > + */ > +ApplyBgworkerState * > +apply_bgworker_find(TransactionId xid) > > SUGGESTION (for function comment) > Find the worker previously assigned/cached for this xid. (see function > apply_bgworker_start) > Changed. > ~~~ > > 7. > > + Assert(status == APPLY_BGWORKER_BUSY); > + > + return entry->wstate; > + } > + else > + return NULL; > > IMO here it is better to just remove that 'else' and unconditionally > return NULL at the end of this function. > Changed. > ~~~ > > 8. src/backend/replication/logical/applybgworker.c - > apply_bgworker_subxact_info_add > > + * Inside apply background worker we can figure out that new subtransaction > was > + * started if new change arrived with different xid. In that case we can > define > + * named savepoint, so that we were able to commit/rollback it separately > + * later. > + * Special case is if the first change comes from subtransaction, then > + * we check that current_xid differs from stream_xid. > + */ > +void > +apply_bgworker_subxact_info_add(TransactionId current_xid) > > It is not quite English. Can you improve it a bit? > > SUGGESTION (maybe like this?) > The apply background worker can figure out if a new subtransaction was > started by checking if the new change arrived with different xid. In > that case define a named savepoint, so that we are able to > commit/rollback it separately later. A special case is when the first > change comes from subtransaction – this is determined by checking if > the current_xid differs from stream_xid. > Changed. > ====== > > 9. src/backend/replication/logical/launcher.c - > WaitForReplicationWorkerAttach > > + * > + * Return false if the attach fails. Otherwise return true. > */ > -static void > +static bool > WaitForReplicationWorkerAttach(LogicalRepWorker *worker, > > Why not just say "Return whether the attach was successful." > Changed. > ~~~ > > 10. src/backend/replication/logical/launcher.c - logicalrep_worker_stop > > + /* Found the main worker, then try to stop it. */ > + if (worker) > + logicalrep_worker_stop_internal(worker); > > IMO the comment is kind of pointless because it only says what the > code is clearly doing. If you really wanted to reinforce this worker > is a main apply worker then you can do that with code like: > > if (worker) > { > Assert(!worker->subworker); > logicalrep_worker_stop_internal(worker); > } > Changed. > ~~~ > > 11. src/backend/replication/logical/launcher.c - logicalrep_worker_detach > > @@ -599,6 +632,29 @@ logicalrep_worker_attach(int slot) > static void > logicalrep_worker_detach(void) > { > + /* > + * This is the main apply worker, stop all the apply background workers we > + * started before. > + */ > + if (!MyLogicalRepWorker->subworker) > > SUGGESTION (for comment) > This is the main apply worker. Stop all apply background workers > previously started from here. > Changed. > ~~~ > > 12 src/backend/replication/logical/launcher.c - > logicalrep_apply_bgworker_count > > +/* > + * Count the number of registered (not necessarily running) apply background > + * workers for a subscription. > + */ > +int > +logicalrep_apply_bgworker_count(Oid subid) > > SUGGESTION > Count the number of registered (but not necessarily running) apply > background workers for a subscription. > Changed. > ~~~ > > 13. > > + /* Search for attached worker for a given subscription id. */ > + for (i = 0; i < max_logical_replication_workers; i++) > > SUGGESTION > Scan all attached apply background workers, only counting those which > have the given subscription id. > Changed. > ====== > > 14. src/backend/replication/logical/worker.c - apply_error_callback > > + { > + if (errarg->remote_attnum < 0) > + { > + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" in transaction > %u", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->remote_xid); > + else > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" in transaction > %u finished at %X/%X", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->remote_xid, > + LSN_FORMAT_ARGS(errarg->finish_lsn)); > + } > + else > + { > + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" column \"%s\" > in transaction %u", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->rel->remoterel.attnames[errarg->remote_attnum], > + errarg->remote_xid); > + else > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" column \"%s\" > in transaction %u finished at %X/%X", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->rel->remoterel.attnames[errarg->remote_attnum], > + errarg->remote_xid, > + LSN_FORMAT_ARGS(errarg->finish_lsn)); > + } > + } > > There is quite a lot of common code here: > > "processing remote data for replication origin \"%s\" during \"%s\" > for replication target relation \"%s.%s\" > > errarg->origin_name, > logicalrep_message_type(errarg->command), > errarg->rel->remoterel.nspname, > errarg->rel->remoterel.relname, > > Is it worth trying to extract that common part to keep this code > shorter? E.g. It could be easily done just with some #defines > I am not sure do we have a clean way to change this, any suggestions ? > ====== > > 15. src/include/replication/worker_internal.h > > + /* proto version of publisher. */ > + uint32 proto_version; > > SUGGESTION > Protocol version of publisher > > ~~~ > Changed. > 16. > > + /* id of apply background worker */ > + uint32 worker_id; > > Uppercase comment > Changed. > > 17. > > +/* > + * Struct for maintaining an apply background worker. > + */ > +typedef struct ApplyBgworkerState > > I'm not sure what this comment means. Perhaps there are some words missing? > I renamed the struct to ApplyBgworkerInfo which sounds better to me and changed the comments. Best regards, Hou zj