Hi,

On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote:

> @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
>               /* Update our copy of the parameters in pg_control */
>               memcpy(&xlrec, XLogRecGetData(record), 
> sizeof(xl_parameter_change));
>
> +             /*
> +              * Invalidate logical slots if we are in hot standby and the 
> primary
> +              * does not have a WAL level sufficient for logical decoding. 
> No need
> +              * to search for potentially conflicting logically slots if 
> standby is
> +              * running with wal_level lower than logical, because in that 
> case, we
> +              * would have either disallowed creation of logical slots or
> +              * invalidated existing ones.
> +              */
> +             if (InRecovery && InHotStandby &&
> +                     xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +                     wal_level >= WAL_LEVEL_LOGICAL)
> +             {
> +                     TransactionId ConflictHorizon = InvalidTransactionId;
> +
> +                     InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
> InvalidOid, &ConflictHorizon);
> +             }

I mentioned this before, but I still don't understand why
InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a
pointer. It's not even modified, as far as I can see?


>  /*
>   * Report shared-memory space needed by ReplicationSlotsShmemInit.
>   */
> @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>               SpinLockAcquire(&s->mutex);
>               effective_xmin = s->effective_xmin;
>               effective_catalog_xmin = s->effective_catalog_xmin;
> -             invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> -                                        
> XLogRecPtrIsInvalid(s->data.restart_lsn));
> +             invalidated = ObsoleteSlotIsInvalid(s, true) || 
> LogicalReplicationSlotIsInvalid(s);
>               SpinLockRelease(&s->mutex);

I don't understand why we need to have two different functions for this.


>               /* invalidated slots need not apply */
> @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
>       }
>  }
>
> +
> +/*
> + * Report terminating or conflicting message.
> + *
> + * For both, logical conflict on standby and obsolete slot are handled.
> + */
> +static void
> +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
> +                                                       NameData slotname, 
> TransactionId *xid,
> +                                                       XLogRecPtr 
> restart_lsn, XLogRecPtr oldestLSN)
> +{
> +     StringInfoData err_msg;
> +     StringInfoData err_detail;
> +     bool            hint = false;
> +
> +     initStringInfo(&err_detail);
> +
> +     if (check_on_xid)
> +     {
> +             if (!terminating)
> +             {
> +                     initStringInfo(&err_msg);
> +                     appendStringInfo(&err_msg, _("invalidating replication 
> slot \"%s\" because it conflicts with recovery"),
> +                                                      NameStr(slotname));

I still don't think the main error message should differ between invalidating
a slot due recovery and max_slot_wal_keep_size.

> +
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim 
> (and
>   * in that case we're not holding the lock at return, otherwise we are).
>   *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched 
> otherwise.)

What's the point of making this specific to "obsolete slots"?


>   * This is inherently racy, because we release the LWLock
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
>  InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> -                                                        bool *invalidated)
> +                                                        bool *invalidated, 
> TransactionId *xid)
>  {
>       int                     last_signaled_pid = 0;
>       bool            released_lock = false;
> +     bool            check_on_xid;
> +
> +     check_on_xid = xid ? true : false;
>
>       for (;;)
>       {
>               XLogRecPtr      restart_lsn;
> +
>               NameData        slotname;
>               int                     active_pid = 0;
>
> @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>                * Check if the slot needs to be invalidated. If it needs to be
>                * invalidated, and is not currently acquired, acquire it and 
> mark it
>                * as having been invalidated.  We do this with the spinlock 
> held to
> -              * avoid race conditions -- for example the restart_lsn could 
> move
> -              * forward, or the slot could be dropped.
> +              * avoid race conditions -- for example the restart_lsn (or the
> +              * xmin(s) could) move forward or the slot could be dropped.
>                */
>               SpinLockAcquire(&s->mutex);
>
>               restart_lsn = s->data.restart_lsn;
>
>               /*
> -              * If the slot is already invalid or is fresh enough, we don't 
> need to
> -              * do anything.
> +              * If the slot is already invalid or is a non conflicting slot, 
> we
> +              * don't need to do anything.
>                */
> -             if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
> oldestLSN)
> +             if (DoNotInvalidateSlot(s, xid, &oldestLSN))

DoNotInvalidateSlot() seems odd to me, and makes the code harder to
understand. I'd make it something like:

if (!SlotIsInvalid(s) && (
      LogicalSlotConflictsWith(s, xid) ||
      SlotConflictsWithLSN(s, lsn)))


>  /*
> - * Mark any slot that points to an LSN older than the given segment
> - * as invalid; it requires WAL that's about to be removed.
> + * Invalidate Obsolete slots or resolve recovery conflicts with logical 
> slots.

I don't like that this spreads "obsolete slots" around further - it's very
unspecific. A logical slot that needs to be removed due to an xid conflict is
just as obsolete as one that needs to be removed due to max_slot_wal_keep_size.

I'd rephrase this to be about required resources getting removed or such, one
case of that is WAL another case is xids.

>  restart:
>       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> @@ -1414,21 +1505,35 @@ restart:
>               if (!s->in_use)
>                       continue;
>
> -             if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
> +             if (xid)
>               {
> -                     /* if the lock was released, start from scratch */
> -                     goto restart;
> +                     /* we are only dealing with *logical* slot conflicts */
> +                     if (!SlotIsLogical(s))
> +                             continue;
> +
> +                     /*
> +                      * not the database of interest and we don't want all 
> the
> +                      * database, skip
> +                      */
> +                     if (s->data.database != dboid && 
> TransactionIdIsValid(*xid))
> +                             continue;

ISTM that this should be in InvalidatePossiblyObsoleteSlot().


>       /*
> -      * If any slots have been invalidated, recalculate the resource limits.
> +      * If any slots have been invalidated, recalculate the required xmin and
> +      * the required lsn (if appropriate).
>        */
>       if (invalidated)
>       {
>               ReplicationSlotsComputeRequiredXmin(false);
> -             ReplicationSlotsComputeRequiredLSN();
> +             if (!xid)
> +                     ReplicationSlotsComputeRequiredLSN();
>       }

Why make this conditional? If we invalidated a logical slot, we also don't
require as much WAL anymore, no?


> @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
> snapshotConflictHorizon,
>                                                                               
>    PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
>                                                                               
>    WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
>                                                                               
>    true);
> +
> +     if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
> +             InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
> locator.dbOid, &snapshotConflictHorizon);
>  }

Hm. Is there a reason for doing this before resolving conflicts with existing
sessions?


Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes
WaitExceedsMaxStandbyDelay() into account, but
InvalidateObsoleteReplicationSlots() does not. I think that's ok, because the
setup should prevent this case from being reached in normal paths, but at
least there should be a comment documenting this.



> +static inline bool
> +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
> +{
> +     TransactionId slot_effective_xmin;
> +     TransactionId slot_catalog_xmin;
> +
> +     slot_effective_xmin = s->effective_xmin;
> +     slot_catalog_xmin = s->data.catalog_xmin;
> +
> +     return (((TransactionIdIsValid(slot_effective_xmin) && 
> TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) ||
> +                      (TransactionIdIsValid(slot_catalog_xmin) && 
> TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
> +}

return -ETOOMANYPARENS


> +static inline bool
> +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
> +{
> +     return (s->data.restart_lsn >= oldestLSN);
> +}
> +
> +static inline bool
> +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
> +{
> +     return (TransactionIdIsValid(*xid) && 
> !LogicalReplicationSlotXidsConflict(s, *xid));
> +}
> +
> +static inline bool
> +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr 
> *oldestLSN)
> +{
> +     if (xid)
> +             return (LogicalReplicationSlotIsInvalid(s) || 
> LogicalSlotIsNotConflicting(s, xid));
> +     else
> +             return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, 
> *oldestLSN));
> +
> +}

See above for some more comments. But please don't accept stuff via pointer if
you don't have a reason for it. There's no reason for it for xid and oldestLSN
afaict.


> diff --git a/src/backend/access/transam/xlogrecovery.c 
> b/src/backend/access/transam/xlogrecovery.c
> index dbe9394762..186e4ef600 100644
> --- a/src/backend/access/transam/xlogrecovery.c
> +++ b/src/backend/access/transam/xlogrecovery.c
> @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
> *record, TimeLineID *repl
>       XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
>       SpinLockRelease(&XLogRecoveryCtl->info_lck);
>
> +     /*
> +      * Wakeup walsenders:
> +      *
> +      * On the standby, the WAL is flushed first (which will only wake up
> +      * physical walsenders) and then applied, which will only wake up 
> logical
> +      * walsenders.
> +      * Indeed, logical walsenders on standby can't decode and send data 
> until
> +      * it's been applied.
> +      *
> +      * Physical walsenders don't need to be waked up during replay unless

s/waked/woken/

> +      * cascading replication is allowed and time line change occured (so 
> that
> +      * they can notice that they are on a new time line).
> +      *
> +      * That's why the wake up conditions are for:
> +      *
> +      *  - physical walsenders in case of new time line and cascade
> +      *  replication is allowed.
> +      *  - logical walsenders in case of new time line or recovery is in 
> progress
> +      *  (logical decoding on standby).
> +      */
> +     WalSndWakeup(switchedTLI && AllowCascadeReplication(),
> +                              switchedTLI || RecoveryInProgress());

I don't think it's possible to get here without RecoveryInProgress() being
true. So we don't need that condition.


> @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
>               /* Signal the startup process and walsender that new WAL has 
> arrived */
>               WakeupRecovery();
>               if (AllowCascadeReplication())
> -                     WalSndWakeup();
> +                     WalSndWakeup(true, !RecoveryInProgress());

Same comment as earlier.


>               /* Report XLOG streaming progress in PS display */
>               if (update_process_title)
> diff --git a/src/backend/replication/walsender.c 
> b/src/backend/replication/walsender.c
> index 2d908d1de2..5c68ebb79e 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
>                       walsnd->sync_standby_priority = 0;
>                       walsnd->latch = &MyProc->procLatch;
>                       walsnd->replyTime = 0;
> +
> +                     /*
> +                      * The kind assignment is done here and not in 
> StartReplication()
> +                      * and StartLogicalReplication(). Indeed, the logical 
> walsender
> +                      * needs to read WAL records (like snapshot of running
> +                      * transactions) during the slot creation. So it needs 
> to be woken
> +                      * up based on its kind.
> +                      *
> +                      * The kind assignment could also be done in 
> StartReplication(),
> +                      * StartLogicalReplication() and 
> CREATE_REPLICATION_SLOT but it
> +                      * seems better to set it on one place.
> +                      */

Doesn't that mean we'll wake up logical walsenders even if they're doing
normal query processing?


> +                     if (MyDatabaseId == InvalidOid)
> +                             walsnd->kind = REPLICATION_KIND_PHYSICAL;
> +                     else
> +                             walsnd->kind = REPLICATION_KIND_LOGICAL;
> +
>                       SpinLockRelease(&walsnd->mutex);
>                       /* don't need the lock anymore */
>                       MyWalSnd = (WalSnd *) walsnd;
> @@ -3310,30 +3327,39 @@ WalSndShmemInit(void)
>  }
>
>  /*
> - * Wake up all walsenders
> + * Wake up physical, logical or both walsenders kind
> + *
> + * The distinction between physical and logical walsenders is done, because:
> + * - physical walsenders can't send data until it's been flushed
> + * - logical walsenders on standby can't decode and send data until it's been
> + * applied
> + *
> + * For cascading replication we need to wake up physical
> + * walsenders separately from logical walsenders (see the comment before 
> calling
> + * WalSndWakeup() in ApplyWalRecord() for more details).
>   *
>   * This will be called inside critical sections, so throwing an error is not
>   * advisable.
>   */
>  void
> -WalSndWakeup(void)
> +WalSndWakeup(bool physical, bool logical)
>  {
>       int                     i;
>
>       for (i = 0; i < max_wal_senders; i++)
>       {
>               Latch      *latch;
> +             ReplicationKind kind;
>               WalSnd     *walsnd = &WalSndCtl->walsnds[i];
>
> -             /*
> -              * Get latch pointer with spinlock held, for the unlikely case 
> that
> -              * pointer reads aren't atomic (as they're 8 bytes).
> -              */
> +             /* get latch pointer and kind with spinlock helds */
>               SpinLockAcquire(&walsnd->mutex);
>               latch = walsnd->latch;
> +             kind = walsnd->kind;
>               SpinLockRelease(&walsnd->mutex);
>
> -             if (latch != NULL)
> +             if (latch != NULL && ((physical && kind == 
> REPLICATION_KIND_PHYSICAL) ||
> +                                                       (logical && kind == 
> REPLICATION_KIND_LOGICAL)))
>                       SetLatch(latch);
>       }
>  }

I'd consider rewriting this to something like:

if (latch == NULL)
    continue;

if ((physical && kind == REPLICATION_KIND_PHYSICAL)) ||
   (logical && kind == REPLICATION_KIND_LOGICAL)
    SetLatch(latch)



Greetings,

Andres Freund


Reply via email to