> On Oct 24, 2025, at 23:22, vignesh C <[email protected]> wrote:
> 
> Regards,
> Vignesh
> <v20251024-0001-Rename-sync_error_count-to-tbl_sync_error_.patch><v20251024-0002-Add-worker-type-argument-to-logicalrep_wor.patch><v20251024-0003-New-worker-for-sequence-synchronization-du.patch><v20251024-0004-Documentation-for-sequence-synchronization.patch>


The changes in 0001 are straightforward, looks good. I haven’t reviewed 0004 
yet. Got a few comments for 0002 and 0003.

1 - 0002
```
  * We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables and sequences. For table sync workers, the relid
+ * should be set to the OID of the relation being synchronized.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
+                                          bool only_running)
 {
        int                     i;
        LogicalRepWorker *res = NULL;
 
        Assert(LWLockHeldByMe(LogicalRepWorkerLock));
```

The comment says that “for apply workers, the relid should be set to 
InvalidOid”, so is it worthy adding an assert for that?

2 - 0002
```
-       /* Search for attached worker for a given subscription id. */
+       /* Search for the attached worker matching the specified criteria. */
        for (i = 0; i < max_logical_replication_workers; i++)
        {
```

Minor issue with the comment:

* we are not search for a specific work, so “the” should be “a”
* “attached” is confusing. In the old comment, ‘attached” tied to “a given 
subscription id”, but now, attach to what?

So suggested revision:

“/* Search for a logical replication worker matching the specified criteria */”

3 - 0002
```
 /*
  * Stop the logical replication worker for subid/relid, if any.
+ *
+ * Similar to logicalrep_worker_find, relid should be set to a valid OID only
+ * for table sync workers.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
```

The comment should be updated: subid/relid => subid/relid/wtype.

4 - 0002
```
@@ -477,7 +477,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
                        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
                        syncworker = 
logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                                                               
                rstate->relid, false);
+                                                                               
                rstate->relid,
+                                                                               
                WORKERTYPE_TABLESYNC, true);
```

Why changed only_running from false to true? This commit adds a new worker 
type, but don’t tend to change the existing logic.

5 - 0003
```
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+       LogicalRepWorker *worker;
+
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+       /*
+        * Set the last_seqsync_start_time for the sequence worker in the apply
+        * worker instead of the sequence sync worker, as the sequence sync 
worker
+        * has finished and is about to exit.
+        */
+       worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid,
+                                                                       
WORKERTYPE_APPLY, true);
+       if (worker)
+               worker->last_seqsync_start_time = 0;
+
+       LWLockRelease(LogicalRepWorkerLock);
+}
```

Two comments for this new function:

* The function comment and in-code comment are redundant. Suggesting move the 
in-code comment to function comment.
* Why LW_SHARED is used? We are writing worker->last_seqsync_start_time, 
shouldn’t LW_EXCLUSIVE be used?

6 - 0003
```
+       /*
+        * Count running sync workers for this subscription, while we have the
+        * lock.
+        */
+       nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+       LWLockRelease(LogicalRepWorkerLock);
+
+       launch_sync_worker(nsyncworkers, InvalidOid,
+                                          
&MyLogicalRepWorker->last_seqsync_start_time);
```

I think here could be a race condition. Because the lock is acquired in 
LW_SHARED, meaning multiple caller may get the same nsyncworkers. Then it 
launches sync worker based on nsyncworkers, which would use inaccurate 
nsyncworkers, because between LWLockRelease() and launch_sync_worker(), another 
worker might be started.

But if that is not the case, only one caller should call 
ProcessSyncingSequencesForApply(), then why the lock is needed?

7 - 0003
```
+       if (insuffperm_seqs->len)
+       {
+               appendStringInfo(combined_error_detail, "Insufficient 
permission for sequence(s): (%s)",
+                                                insuffperm_seqs->data);
+               appendStringInfoString(combined_error_hint, "Grant permissions 
for the sequence(s).");
+       }
```

“Grant permissions” is unclear. Should it be “Grant UPDATE privilege”?

8 - 0003
```
+                       appendStringInfoString(combined_error_hint, " For 
mismatched sequences, alter or re-create local sequences to have matching 
parameters as publishers.");
```

“To have matching parameters as publishers” grammatically sound not good. Maybe 
revision to “to match the publisher’s parameters”.

9 - 0003
```
+               /*
+                * current_indexes is not incremented sequentially because some
+                * sequences may be missing, and the number of fetched rows may 
not
+                * match the batch size. The `hash_search` with HASH_REMOVE 
takes care
+                * of the count.
+                */
```

Typo: current_indexes => current_index

10 - 0003
```
-       /* Find the leader apply worker and signal it. */
-       logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+       /*
+        * This is a clean exit of the sequencesync worker; reset the
+        * last_seqsync_start_time.
+        */
+       if (wtype == WORKERTYPE_SEQUENCESYNC)
+               logicalrep_reset_seqsync_start_time();
+       else
+               /* Find the leader apply worker and signal it. */
+               logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
```

The comment “this is a clean exist of sequencsync worker” is specific to “if”, 
so suggesting moving into “if”. And “this is a clean exis of the sequencesyc 
worker” is not needed, keep consistent with the comment in “else”.

11 - 0003
```
+void
+launch_sync_worker(int nsyncworkers, Oid relid, TimestampTz *last_start_time)
+{
+       /* If there is a free sync worker slot, start a new sync worker */
+       if (nsyncworkers < max_sync_workers_per_subscription)
+       {
```

The entire function is under an “if”, so we can do “if (!…) return”, so saves a 
level of indent.

Best regards,
—
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/






Reply via email to