On Thu, Feb 26, 2026 at 1:07 PM Ajin Cherian <[email protected]> wrote:
>
Few comments:
=============
1.
+ oldctx = MemoryContextSwitchTo(SequenceSyncContext);
- initStringInfo(&app_name);
- appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
- MySubscription->oid, GetSystemIdentifier());
+ /* Process sequences */
+ sequence_copied = copy_sequences(conn, seqinfos);
- /*
- * Establish the connection to the publisher for sequence synchronization.
- */
- LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true, true,
- must_use_password,
- app_name.data, &err);
- if (LogRepWorkerWalRcvConn == NULL)
- ereport(ERROR,
- errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("sequencesync worker for subscription \"%s\" could not
connect to the publisher: %s",
- MySubscription->name, err));
-
- pfree(app_name.data);
-
- copy_sequences(LogRepWorkerWalRcvConn);
+ MemoryContextSwitchTo(oldctx);
It is better to switch to SequenceSyncContext at the caller of
LogicalRepSyncSequences similar to what we are doing for
ApplyMessageContext.
2.
@@ -4221,6 +4221,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
ProcessConfigFile(PGC_SIGHUP);
}
+
if (rc & WL_TIMEOUT)
Spurious line addition.
3. Apart from above, the attached patch contains comments and cosmetic changes.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/sequencesync.c
b/src/backend/replication/logical/sequencesync.c
index ffbbd1257d0..3898b315bb8 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -111,8 +111,8 @@ static MemoryContext SequenceSyncContext = NULL;
* unsynchronized after it exits, a new worker can be started in the next
* iteration.
*
- * The pointer to the sequencesync worker is cached to avoid acquiring an
- * LWLock and scanning the workers array each time via
logicalrep_worker_find().
+ * The pointer to the sequencesync worker is cached to avoid scanning the
+ * workers array each time via logicalrep_worker_find().
*/
void
MaybeLaunchSequenceSyncWorker(void)
@@ -137,7 +137,7 @@ MaybeLaunchSequenceSyncWorker(void)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
/*
- * Return if the sequence sync worker for the current subscription is
+ * Quick exit if the sequence sync worker for the current subscription
is
* already alive.
*/
if (sequencesync_worker &&
@@ -586,9 +586,8 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
{
case COPYSEQ_SUCCESS:
elog(DEBUG1,
- "logical replication
synchronizatio for subscription \"%s\", sequence \"%s.%s\" has been updated",
- MySubscription->name,
seqinfo->nspname,
- seqinfo->seqname);
+ "logical replication
synchronization has updated sequence \"%s.%s\" in subscription \"%s\"",
+ seqinfo->nspname,
seqinfo->seqname, MySubscription->name);
batch_succeeded_count++;
sequence_copied = true;
break;
@@ -765,14 +764,12 @@ LogicalRepSyncSequences(WalReceiverConn *conn)
* allocate them under SequenceSyncContext.
*/
oldctx = MemoryContextSwitchTo(SequenceSyncContext);
-
seq = palloc0_object(LogicalRepSequenceInfo);
seq->localrelid = subrel->srrelid;
seq->nspname =
get_namespace_name(RelationGetNamespace(sequence_rel));
seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
seq->relstate = relstate;
seqinfos = lappend(seqinfos, seq);
-
MemoryContextSwitchTo(oldctx);
table_close(sequence_rel, NoLock);
@@ -860,7 +857,6 @@ start_sequence_sync(void)
/* Process any invalidation messages that might have
accumulated */
AcceptInvalidationMessages();
-
maybe_reread_subscription();
/*
@@ -875,7 +871,6 @@ start_sequence_sync(void)
}
else
{
-
/*
* Double the sleep time, but not beyond
* the maximum allowable value.