On Mon, Aug 8, 2022 at 11:41 AM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Mon, Aug 8, 2022 at 10:18 AM Dilip Kumar <dilipbal...@gmail.com> wrote: > > > > > Based on above, we plan to first introduce the patch to perform streaming > > > logical transactions by background workers, and then introduce parallel > > > apply > > > normal transaction which design is different and need some additional > > > handling. > > > > Yeah I think that makes sense. Since the streamed transactions are > > sent to standby interleaved so we can take advantage of parallelism > > and along with that we can also avoid the I/O so that will also > > speedup. > > Some review comments on the latest version of the patch. > > 1. > +/* Queue size of DSM, 16 MB for now. */ > +#define DSM_QUEUE_SIZE 160000000 > > Why don't we directly use 16 *1024 * 1024, that would be exactly 16 MB > so it will match with comments and also it would be more readable. > > 2. > +/* > + * There are three fields in message: start_lsn, end_lsn and send_time. > Because > + * we have updated these statistics in apply worker, we could ignore these > + * fields in apply background worker. (see function LogicalRepApplyLoop) > + */ > +#define SIZE_STATS_MESSAGE (3 * sizeof(uint64)) > > Instead of assuming you have 3 uint64 why don't directly add 2 * > sizeof(XLogRecPtr) + sizeof(TimestampTz) so that if this data type > ever changes > we don't need to track that we will have to change this as well. > > 3. > +/* > + * Entry for a hash table we use to map from xid to our apply background > worker > + * state. > + */ > +typedef struct ApplyBgworkerEntry > +{ > + TransactionId xid; > + ApplyBgworkerState *wstate; > +} ApplyBgworkerEntry; > > Mention in the comment of the structure or for the member that xid is > the key of the hash. Refer to other such structures for the > reference. > > I am doing a more detailed review but this is what I got so far.
Some more comments + /* + * Exit if any relation is not in the READY state and if any worker is + * handling the streaming transaction at the same time. Because for + * streaming transactions that is being applied in apply background + * worker, we cannot decide whether to apply the change for a relation + * that is not in the READY state (see should_apply_changes_for_rel) as we + * won't know remote_final_lsn by that time. + */ + if (list_length(ApplyBgworkersFreeList) != list_length(ApplyBgworkersList) && + !AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply workers for subscription \"%s\" will restart", + MySubscription->name), + errdetail("Cannot handle streamed replication transaction by apply " + "background workers until all tables are synchronized"))); + + proc_exit(0); + } How this situation can occur? I mean while starting a background worker itself we can check whether all tables are sync ready or not right? + /* Check the status of apply background worker if any. */ + apply_bgworker_check_status(); + What is the need to checking each worker status on every commit? I mean if there are a lot of small transactions along with some steamiing transactions then it will affect the apply performance for those small transactions? -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com