On Thu, Feb 26, 2026 at 1:07 PM Ajin Cherian <[email protected]> wrote:
>

Few comments:
=============
1.
+ oldctx = MemoryContextSwitchTo(SequenceSyncContext);

- initStringInfo(&app_name);
- appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
- MySubscription->oid, GetSystemIdentifier());
+ /* Process sequences */
+ sequence_copied = copy_sequences(conn, seqinfos);

- /*
- * Establish the connection to the publisher for sequence synchronization.
- */
- LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true, true,
-    must_use_password,
-    app_name.data, &err);
- if (LogRepWorkerWalRcvConn == NULL)
- ereport(ERROR,
- errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("sequencesync worker for subscription \"%s\" could not
connect to the publisher: %s",
-    MySubscription->name, err));
-
- pfree(app_name.data);
-
- copy_sequences(LogRepWorkerWalRcvConn);
+ MemoryContextSwitchTo(oldctx);

It is better to switch to SequenceSyncContext at the caller of
LogicalRepSyncSequences similar to what we are doing for
ApplyMessageContext.

2.
@@ -4221,6 +4221,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
  ProcessConfigFile(PGC_SIGHUP);
  }

+
  if (rc & WL_TIMEOUT)

Spurious line addition.

3. Apart from above, the attached patch contains comments and cosmetic changes.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/sequencesync.c 
b/src/backend/replication/logical/sequencesync.c
index ffbbd1257d0..3898b315bb8 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -111,8 +111,8 @@ static MemoryContext SequenceSyncContext = NULL;
  * unsynchronized after it exits, a new worker can be started in the next
  * iteration.
  *
- * The pointer to the sequencesync worker is cached to avoid acquiring an
- * LWLock and scanning the workers array each time via 
logicalrep_worker_find().
+ * The pointer to the sequencesync worker is cached to avoid scanning the
+ * workers array each time via logicalrep_worker_find().
  */
 void
 MaybeLaunchSequenceSyncWorker(void)
@@ -137,7 +137,7 @@ MaybeLaunchSequenceSyncWorker(void)
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
        /*
-        * Return if the sequence sync worker for the current subscription is
+        * Quick exit if the sequence sync worker for the current subscription 
is
         * already alive.
         */
        if (sequencesync_worker &&
@@ -586,9 +586,8 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
                        {
                                case COPYSEQ_SUCCESS:
                                        elog(DEBUG1,
-                                                "logical replication 
synchronizatio for subscription \"%s\", sequence \"%s.%s\" has been updated",
-                                                MySubscription->name, 
seqinfo->nspname,
-                                                seqinfo->seqname);
+                                                "logical replication 
synchronization has updated sequence \"%s.%s\" in subscription \"%s\"",
+                                                seqinfo->nspname, 
seqinfo->seqname, MySubscription->name);
                                        batch_succeeded_count++;
                                        sequence_copied = true;
                                        break;
@@ -765,14 +764,12 @@ LogicalRepSyncSequences(WalReceiverConn *conn)
                 * allocate them under SequenceSyncContext.
                 */
                oldctx = MemoryContextSwitchTo(SequenceSyncContext);
-
                seq = palloc0_object(LogicalRepSequenceInfo);
                seq->localrelid = subrel->srrelid;
                seq->nspname = 
get_namespace_name(RelationGetNamespace(sequence_rel));
                seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
                seq->relstate = relstate;
                seqinfos = lappend(seqinfos, seq);
-
                MemoryContextSwitchTo(oldctx);
 
                table_close(sequence_rel, NoLock);
@@ -860,7 +857,6 @@ start_sequence_sync(void)
 
                        /* Process any invalidation messages that might have 
accumulated */
                        AcceptInvalidationMessages();
-
                        maybe_reread_subscription();
 
                        /*
@@ -875,7 +871,6 @@ start_sequence_sync(void)
                        }
                        else
                        {
-
                                /*
                                 * Double the sleep time, but not beyond
                                 * the maximum allowable value.

Reply via email to