On Friday, November 25, 2022 10:54 AM Peter Smith <smithpb2...@gmail.com> wrote: > > Here are some review comments for v51-0001.
Thanks for the comments! > ====== > > .../replication/logical/applyparallelworker.c > > 1. General - Error messages, get_worker_name() > > I previously wrote a comment to ask if the get_worker_name() should be used > in more places but the reply [1, #2b] was: > > > 2b. > > Consider if maybe all of these ought to be calling get_worker_name() > > which is currently static in worker.c. Doing this means any future > > changes to get_worker_name won't cause more inconsistencies. > > The most error message in applyparallelxx.c can only use "xx parallel worker", > so I think it's fine not to call get_worker_name > > ~ > > I thought the reply missed the point I was trying to make -- I meant if it was > arranged now so *every* message would go via > get_worker_name() then in future somebody wanted to change the names (e.g. > from "logical replication parallel apply worker" to "LR PA > worker") then it would only need to be changed in one central place instead of > hunting down every hardwired error message. > Thanks for the suggestion. I understand your point, but I feel that using get_worker_name() at some places where the worker type is decided could make developer think that all kind of worker can enter this code which I am not sure is better. So I didn't change this. > > 2. HandleParallelApplyMessage > > + case 'X': /* Terminate, indicating clean exit. */ > + shm_mq_detach(winfo->error_mq_handle); > + winfo->error_mq_handle = NULL; > + break; > + default: > + elog(ERROR, "unrecognized message type received from logical > replication parallel apply worker: %c (message length %d bytes)", > + msgtype, msg->len); > > The case 'X' code indentation is too much. Changed. > ====== > > src/backend/replication/logical/origin.c > > 3. replorigin_session_setup(RepOriginId node, int acquired_by) > > @@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg) > * array doesn't have to be searched when calling > * replorigin_session_advance(). > * > - * Obviously only one such cached origin can exist per process and the > current > + * Normally only one such cached origin can exist per process and the > + current > * cached value can only be set again after the previous value is torn down > * with replorigin_session_reset(). > + * > + * However, we do allow multiple processes to point to the same origin > + slot if > + * requested by the caller by passing PID of the process that has > + already > + * acquired it as acquired_by. This is to allow multiple parallel apply > + * processes to use the same origin, provided they maintain commit > + order, for > + * example, by allowing only one process to commit at a time. For the > + first > + * process requesting this origin, the acquired_by parameter needs to > + be set to > + * 0. > */ > void > -replorigin_session_setup(RepOriginId node) > +replorigin_session_setup(RepOriginId node, int acquired_by) > > I think the meaning of the acquired_by=0 is not fully described here: > "For the first process requesting this origin, the acquired_by parameter needs > to be set to 0." > IMO that seems to be describing it only from POV that you are always going to > want to allow multiple processes. But really this is an optional feature so > you > might pass acquired_by=0, not just because this is the first of multiple, but > also > because you *never* want to allow multiple at all. The comment does not > convey this meaning. > > Maybe something worded like below is better? > > SUGGESTION > Normally only one such cached origin can exist per process so the cached value > can only be set again after the previous value is torn down with > replorigin_session_reset(). For this normal case pass > acquired_by=0 (meaning the slot is not allowed to be already acquired by > another process). > > However, sometimes multiple processes can safely re-use the same origin slot > (for example, multiple parallel apply processes can safely use the same > origin, > provided they maintain commit order by allowing only one process to commit > at a time). For this case the first process must pass acquired_by=0, and then > the > other processes sharing that same origin can pass acquired_by=PID of the first > process. Changes as suggested. > ====== > > src/backend/replication/logical/worker.c > > 4. GENERAL - get_worker_name() > > If you decide it is OK to hardwire some error messages instead of > unconditionally calling the get_worker_name() -- see my #1 review comment in > this post -- then there are some other messages in this file that also seem > like > they can be also hardwired because the type of worker is already known. > > Here are some examples: > > 4a. > > + else if (am_parallel_apply_worker()) > + { > + if (rel->state != SUBREL_STATE_READY) > + ereport(ERROR, > + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + /* translator: first %s is the name of logical replication worker */ > + errmsg("%s for subscription \"%s\" will stop", get_worker_name(), > + MySubscription->name), errdetail("Cannot handle streamed replication > + transactions using > parallel apply workers until all tables have been synchronized."))); > + > + return true; > + } > > In the above code from should_apply_changes_for_rel we already know this is a > parallel apply worker. > > ~ > > 4b. > > + if (am_parallel_apply_worker()) > + ereport(LOG, > + /* translator: first %s is the name of logical replication worker */ > + (errmsg("%s for subscription \"%s\" will stop because of a parameter > + change", get_worker_name(), MySubscription->name))); else > > In the above code from maybe_reread_subscription we already know this is a > parallel apply worker. > > 4c. > > if (am_tablesync_worker()) > ereport(LOG, > - (errmsg("logical replication table synchronization worker for subscription > \"%s\", table \"%s\" has started", > - MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); > + /* translator: first %s is the name of logical replication worker */ > + (errmsg("%s for subscription \"%s\", table \"%s\" has started", > + get_worker_name(), MySubscription->name, > + get_rel_name(MyLogicalRepWorker->relid)))); > > In the above code from ApplyWorkerMain we already know this is a tablesync > worker Thanks for checking these, changed. > ~~~ > > 5. get_transaction_apply_action > > + > +/* > + * Return the action to take for the given transaction. *winfo is > +assigned to > + * the destination parallel worker info (if the action is > + * TRANS_LEADER_SEND_TO_PARALLEL, otherwise *winfo is assigned NULL. > + */ > +static TransApplyAction > +get_transaction_apply_action(TransactionId xid, > ParallelApplyWorkerInfo **winfo) > > There is no closing ')' in the function comment. Added. > ~~~ > > 6. apply_worker_clean_exit > > + /* Notify the leader apply worker that we have exited cleanly. */ if > + (am_parallel_apply_worker()) pq_putmessage('X', NULL, 0); > > IMO the comment would be better inside the if block > > SUGGESTION > if (am_parallel_apply_worker()) > { > /* Notify the leader apply worker that we have exited cleanly. */ > pq_putmessage('X', NULL, 0); > } Changed. Best regards, Hou zj