Hi, Have you checked how high the overhead of XLogReadDetermineTimeline is? A non-local function call, especially into a different translation-unit (no partial inlining), for every single page might end up being noticeable. That's fine in the cases it actually adds functionality, but for a master streaming out data, that's not actually adding anything.
Did you check whether you changes to read_local_xlog_page could cause issues with twophase.c? Because that now also uses it. Did you check whether ThisTimeLineID is actually always valid in the processes logical decoding could run in? IIRC it's not consistently update during recovery in any process but the startup process. On 2017-03-19 21:12:23 +0800, Craig Ringer wrote: > From 2fa891a555ea4fb200d75b8c906c6b932699b463 Mon Sep 17 00:00:00 2001 > From: Craig Ringer <cr...@2ndquadrant.com> > Date: Thu, 1 Sep 2016 10:16:55 +0800 > Subject: [PATCH 2/3] Follow timeline switches in logical decoding FWIW, the title doesn't really seem accurate to me. > Logical slots cannot actually be created on a replica without use of > the low-level C slot management APIs so this is mostly foundation work > for subsequent changes to enable logical decoding on standbys. Everytime I read references to anything like this my blood starts to boil. I kind of regret not having plastered RecoveryInProgress() errors all over this code. > From 8854d44e2227b9d076b0a25a9c8b9df9270b2433 Mon Sep 17 00:00:00 2001 > From: Craig Ringer <cr...@2ndquadrant.com> > Date: Mon, 5 Sep 2016 15:30:53 +0800 > Subject: [PATCH 3/3] Logical decoding on standby > > * Make walsender aware of ProcSignal and recovery conflicts, make walsender > exit with recovery conflict on upstream drop database when it has an active > logical slot on that database. > * Allow GetOldestXmin to omit catalog_xmin, be called already locked. "be called already locked"? > * Send catalog_xmin separately in hot_standby_feedback messages. > * Store catalog_xmin separately on a physical slot if received in > hot_standby_feedback What does separate mean? > * Separate the catalog_xmin used by vacuum from ProcArray's > replication_slot_catalog_xmin, > requiring that xlog be emitted before vacuum can remove no longer needed > catalogs, store > it in checkpoints, make vacuum and bgwriter advance it. I can't parse that sentence. > * Add a new recovery conflict type for conflict with catalog_xmin. Abort > in-progress logical decoding sessions with conflict with recovery where > needed > catalog_xmin is too old Are we retaining WAL for slots broken in that way? > * Make extra efforts to reserve master's catalog_xmin during decoding startup > on standby. What does that mean? > * Remove checks preventing starting logical decoding on standby To me that's too many different things in one commit. A bunch of them seem like it'd be good if they'd get independent buildfarm cycles too. > diff --git a/src/backend/access/heap/rewriteheap.c > b/src/backend/access/heap/rewriteheap.c > index d7f65a5..36bbb98 100644 > --- a/src/backend/access/heap/rewriteheap.c > +++ b/src/backend/access/heap/rewriteheap.c > @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state) > if (!state->rs_logical_rewrite) > return; > > - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); > + /* Use the catalog_xmin being retained by vacuum */ > + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL); What does that comment mean? Vacuum isn't the only thing that prunes old records. > +/* > + * Set the global oldest catalog_xmin used to determine when tuples > + * may be removed from catalogs and user-catalogs accessible from logical > + * decoding. > + * > + * Only to be called from the startup process or by > UpdateOldestCatalogXmin(), > + * which ensures the update is properly written to xlog first. > + */ > +void > +SetOldestCatalogXmin(TransactionId oldestCatalogXmin) > +{ > + Assert(InRecovery || !IsUnderPostmaster || AmStartupProcess() || > LWLockHeldByMe(ProcArrayLock)); Uh, that's long-ish. And doesn't agree with the comment above (s/startup process/process performing recovery/?). This is a long enough list that I'd consider just dropping the assert. > + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) > + { > + xl_xact_catalog_xmin_advance *xlrec = > (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); > + > + /* > + * Unless logical decoding is possible on this node, we don't > care about > + * this record. > + */ > + if (!XLogLogicalInfoActive() || max_replication_slots == 0) > + return; Too many negatives for my taste, but whatever. > + /* > + * Apply the new catalog_xmin limit immediately. New decoding > sessions > + * will refuse to start if their slot is past it, and old ones > will > + * notice when we signal them with a recovery conflict. There's > no > + * effect on the catalogs themselves yet, so it's safe for > backends > + * with older catalog_xmins to still exist. > + * > + * We don't have to take ProcArrayLock since only the startup > process > + * is allowed to change oldestCatalogXmin when we're in > recovery. > + */ > + SetOldestCatalogXmin(xlrec->new_catalog_xmin); Which seems to rely on ResolveRecoveryConflictWithLogicalDecoding's lwlock acquisition for barriers? > +/* > + * Record when we advance the catalog_xmin used for tuple removal > + * so standbys find out before we remove catalog tuples they might > + * need for logical decoding. > + */ > +XLogRecPtr > +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin) > +{ > + XLogRecPtr ptr = InvalidXLogRecPtr; > + > + if (XLogInsertAllowed()) > + { > + xl_xact_catalog_xmin_advance xlrec; > + > + xlrec.new_catalog_xmin = new_catalog_xmin; > + > + XLogBeginInsert(); > + XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance); > + > + ptr = XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV); > + } Huh, why is this test needed and ok? > @@ -9449,6 +9456,16 @@ XLogReportParameters(void) > XLogFlush(recptr); > } > > + /* > + * If wal_level was lowered from WAL_LEVEL_LOGICAL we no longer > + * require oldestCatalogXmin in checkpoints and it no longer > + * makes sense, so update shmem and xlog the change. This will > + * get written out in the next checkpoint. > + */ > + if (ControlFile->wal_level >= WAL_LEVEL_LOGICAL && > + wal_level < WAL_LEVEL_LOGICAL) > + UpdateOldestCatalogXmin(true); What if we crash before this happens? > diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c > index ff633fa..2d16bf0 100644 > --- a/src/backend/commands/vacuum.c > +++ b/src/backend/commands/vacuum.c > @@ -518,6 +518,15 @@ vacuum_set_xid_limits(Relation rel, > MultiXactId safeMxactLimit; > > /* > + * When logical decoding is enabled, we must write any advance of > + * catalog_xmin to xlog before we allow VACUUM to remove those tuples. > + * This ensures that any standbys doing logical decoding can cancel > + * decoding sessions and invalidate slots if we remove tuples they > + * still need. > + */ > + UpdateOldestCatalogXmin(false); I'm on a first read-through through this, but it appears you don't do anything similar in heap_page_prune()? And we can't just start emitting loads of additional records there, because it's called much more often... > /* > * Make sure the current settings & environment are capable of doing logical > * decoding. > @@ -87,23 +95,53 @@ CheckLogicalDecodingRequirements(void) > > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > errmsg("logical decoding requires a database > connection"))); > > - /* ---- > - * TODO: We got to change that someday soon... > - * > - * There's basically three things missing to allow this: > - * 1) We need to be able to correctly and quickly identify the timeline > a > - * LSN belongs to > - * 2) We need to force hot_standby_feedback to be enabled at all times > so > - * the primary cannot remove rows we need. > - * 3) support dropping replication slots referring to a database, in > - * dbase_redo. There can't be any active ones due to HS recovery > - * conflicts, so that should be relatively easy. > - * ---- > - */ > if (RecoveryInProgress()) > - ereport(ERROR, > - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), > - errmsg("logical decoding cannot be used while in > recovery"))); > + { > + bool walrcv_running, walrcv_has_slot; > + > + SpinLockAcquire(&WalRcv->mutex); > + walrcv_running = WalRcv->pid != 0; > + walrcv_has_slot = WalRcv->slotname[0] != '\0'; > + SpinLockRelease(&WalRcv->mutex); > + > + /* > + * The walreceiver should be running when we try to create a > slot. If > + * we're unlucky enough to catch the walreceiver just as it's > + * restarting after an error, well, the client can just retry. > We don't > + * bother to sleep and re-check. > + */ > + if (!walrcv_running) > + ereport(ERROR, > + > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("streaming replication is not > active"), > + errhint("Logical decoding on standby > requires that streaming replication be configured and active. Ensure that > primary_conninfo is correct in recovery.conf and check for streaming > replication errors in the logs."))); That seems quite problematic. What if there's a momentaneous connection failure? This also has the issue that just because you checked that walrcv_running at some point, doesn't guarantee anything by the time you actually check. Seems like life were easier if recovery.conf were guc-ified already - checking for primary_conninfo/primary_slot_name etc wouldn't have that issue (and can't be changed while running). Usage of a slot doesn't actually guarantee much in cascased setups, does it? > @@ -266,7 +306,9 @@ CreateInitDecodingContext(char *plugin, > * xmin horizons by other backends, get the safe decoding xid, and > inform > * the slot machinery about the new limit. Once that's done the > * ProcArrayLock can be released as the slot machinery now is > - * protecting against vacuum. > + * protecting against vacuum - if we're on the master. If we're running > on > + * a replica, we have to wait until hot_standby_feedback locks in our > + * needed catalogs, per details on > WaitForMasterCatalogXminReservation(). > * ---- > */ > LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); > @@ -276,6 +318,12 @@ CreateInitDecodingContext(char *plugin, > > ReplicationSlotsComputeRequiredXmin(true); > > + if (RecoveryInProgress()) > + WaitForMasterCatalogXminReservation(slot); > + > + > Assert(TransactionIdPrecedesOrEquals(ShmemVariableCache->oldestCatalogXmin, > + > slot->data.catalog_xmin)); > + > LWLockRelease(ProcArrayLock); I think it's quite a bad idea to do a blocking operation like WaitForMasterCatalogXminReservation while holding ProcArrayLock. > +/* > + * Wait until the master's catalog_xmin is set, advancing our catalog_xmin > + * if needed. Caller must hold exclusive ProcArrayLock, which this function > will > + * temporarily release while sleeping but will re-acquire. Ah. I see. Hm :(. > + * We're pretty much just hoping that, if someone else already has a > + * catalog_xmin reservation affecting the master, it stays where we want it > + * until our own hot_standby_feedback can pin it down. Hm. > + * When we're creating a slot on a standby we can't directly set the > + * master's catalog_xmin; the catalog_xmin is set locally, then relayed > + * over hot_standby_feedback. The master may remove the catalogs we > + * asked to reserve between when we set a local catalog_xmin and when > + * hs feedback makes that take effect on the master. We need a feedback > + * reply mechanism here, where: > + * > + * - we tentatively reserve catalog_xmin locally Will that already trigger recovery conflicts? > + * - we wake the walreceiver by setting its latch > + * - walreceiver sends hs_feedback > + * - upstream walsender sends a new 'hs_feedback reply' message with > + * actual (xmin, catalog_xmin) reserved. > + * - walreceiver sees reply and updates ShmemVariableCache or some other > + * handy bit of shmem with hs feedback reservations from reply "or some other handy bit"? > + * - we poll the reservations while we wait > + * - we set our catalog_xmin to that value, which might be later if > + * we missed our requested reservation, or might be earlier if > + * someone else is holding down catalog_xmin on master. We got a hs > + * feedback reply so we know it's reserved. > + * > + * For cascading, the actual reservation will need to cascade up the > + * chain by walsender setting its own walreceiver's latch in turn, etc. > + * > + * For now, we just set the local slot catalog_xmin and sleep until > + * oldestCatalogXmin equals or passes our reservation. This is fine if we're > + * the only decoding session, but it is vulnerable to races if slots on the > + * master or other decoding sessions on other standbys connected to the same > + * master exist. They might advance their reservation before our hs_feedback > + * locks it down, allowing vacuum to remove tuples we need. So we might start > + * decoding on our slot then error with a conflict with recovery when we see > + * catalog_xmin advance. > + */ I was about to list some of these issues. That's a bit unsatisfying. Pondering this for a bit, but I'm ~9h into a flight, so maybe not tonight^Wthis morning^Wwhaaaa. > +static void > +WaitForMasterCatalogXminReservation(ReplicationSlot *slot) > +{ This comment seems to duplicate some of the function header comment. Such duplication usually leads to either or both getting out of date rather quickly. Not commenting line-by-line on the code here, but I'm extremely doubtful that this approach is stable enough, and that the effect of holding ProcArrayLock exclusively over prolonged amounts of time is acceptable. > + ReplicationSlotsComputeRequiredXmin(true); > Why do we need this? The caller does it too, no? > + /* Tell the master what catalog_xmin we settled on */ > + WalRcvForceReply(); > + > + /* Reset ps display if we changed it */ > + if (new_status) > + { > + set_ps_display(new_status, false); > + pfree(new_status); > + } We really shouldn't do stuff like this while holding ProcArrayLock. > +/* > + * Test to see if the active logical slot is usable. > + */ > +static void > +EnsureActiveLogicalSlotValid() > +{ Missing (void). > +/* > + * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the > + * passed database oid. The caller should hold an exclusive lock on the > database > + * to ensure no replication slots on the database are in use. Stuff like this really should be it's own commit. It can trivially be tested on its own, is useful on its own (just have DROP DATABASE do it), ... > + * If we fail here we'll leave the in-memory state of replication slots > + * inconsistent with its on-disk state, so we need to PANIC. We worked quite hard to make it extremely unlikely for that to happen in practice. I also don't see why there should be any new PANICs in this code. > + * This routine isn't as efficient as it could be - but we don't drop > databases > + * often, especially databases with lots of slots. That seems fine. > +void > +ReplicationSlotsDropDBSlots(Oid dboid) > +{ > + int i; > + > + if (max_replication_slots <= 0) > + return; > + > + /* > + * We only need a shared lock here even though we activate slots, > + * because we have an exclusive lock on the database we're dropping > + * slots on and don't touch other databases' slots. > + */ > + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); Hm? Acquiring a slot always only takes a shared lock, no? I don't really see how "database is locked" guarantees enough for your logic - it's already possible to drop slots from other databases, and dropping a slot acquires it temporarily? > + for (i = 0; i < max_replication_slots; i++) > + { > + ReplicationSlot *s; > + NameData slotname; > + int active_pid; > + > + s = &ReplicationSlotCtl->replication_slots[i]; > + > + /* cannot change while ReplicationSlotCtlLock is held */ > + if (!s->in_use) > + continue; > + > + /* only logical slots are database specific, skip */ > + if (!SlotIsLogical(s)) > + continue; > + > + /* not our database, skip */ > + if (s->data.database != dboid) > + continue; > + > + /* Claim the slot, as if ReplicationSlotAcquire()ing */ > + SpinLockAcquire(&s->mutex); > + strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN); > + NameStr(slotname)[NAMEDATALEN-1] = '\0'; > + active_pid = s->active_pid; > + if (active_pid == 0) > + { > + MyReplicationSlot = s; > + s->active_pid = MyProcPid; > + } > + SpinLockRelease(&s->mutex); > + > + /* > + * The caller should have an exclusive lock on the database so > + * we'll never have any in-use slots, but just in case... > + */ > + if (active_pid) > + elog(PANIC, "replication slot %s is in use by pid %d", > + NameStr(slotname), active_pid); So, yea, this doesn't seem ok. Why don't we just ERROR out, instead of PANICing? There seems to be absolutely no correctness reason for a PANIC here? > + /* > + * To avoid largely duplicating ReplicationSlotDropAcquired() or > + * complicating it with already_locked flags for ProcArrayLock, > + * ReplicationSlotControlLock and > ReplicationSlotAllocationLock, we > + * just release our ReplicationSlotControlLock to drop the slot. > + * > + * There's no race here: we acquired this slot, and no slot > "behind" > + * our scan can be created or become active with our target > dboid due > + * to our exclusive lock on the DB. > + */ > + LWLockRelease(ReplicationSlotControlLock); > + ReplicationSlotDropAcquired(); > + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); I don't see much problem with this, but I'd change the code so you simply do a goto restart; if you released the slot. Then there's a lot less chance / complications around temporarily releasing ReplicationSlotControlLock. > + * > + * If logical decoding > information is enabled, we also > + * send immediate hot standby > feedback so as to reduce > + * the delay before our needed > catalogs are locked in. "logical decoding information ... enabled" and "catalogs are locked in" are a bit too imprecise descriptions for my taste. > @@ -1175,8 +1181,8 @@ XLogWalRcvSendHSFeedback(bool immed) > { > TimestampTz now; > TransactionId nextXid; > - uint32 nextEpoch; > - TransactionId xmin; > + uint32 xmin_epoch, catalog_xmin_epoch; > + TransactionId xmin, catalog_xmin; > static TimestampTz sendTime = 0; > /* initially true so we always send at least one feedback message */ > static bool master_has_standby_xmin = true; > @@ -1221,29 +1227,57 @@ XLogWalRcvSendHSFeedback(bool immed) > * everything else has been checked. > */ > if (hot_standby_feedback) > - xmin = GetOldestXmin(NULL, false); > + { > + /* > + * Usually GetOldestXmin() would include the catalog_xmin in its > + * calculations, but we don't want to hold upstream back from > vacuuming > + * normal user table tuples just because they're within the > + * catalog_xmin horizon of logical replication slots on this > standby. > + * Instead we report the catalog_xmin to the upstream > separately. > + */ I again don't think it's good to refer to vacuum as it's not the only thing that can remove tuple versions. > + xmin = GetOldestXmin(NULL, > + false, /* don't ignore > vacuum */ > + true /* ignore catalog > xmin */); > + > + /* > + * The catalog_Xmin reported by GetOldestXmin is the effective > + * catalog_xmin used by vacuum, as set by > xl_xact_catalog_xmin_advance > + * records from the master. Sending it back to the master would > be > + * circular and prevent its catalog_xmin ever advancing once > set. > + * We should only send the catalog_xmin we actually need for > slots. > + */ > + ProcArrayGetReplicationSlotXmin(NULL, NULL, &catalog_xmin); Given that you don't have catalog_xmin set by GetOldestXmin that comment is a bit misleading. > @@ -1427,19 +1436,93 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) > NormalTransactionIdPrecedes(replication_slot_xmin, result)) > result = replication_slot_xmin; > > + if (!ignoreCatalogXmin && (rel == NULL || > RelationIsAccessibleInLogicalDecoding(rel))) > + { > + /* > + * After locks have been released and defer_cleanup_age has > been applied, > + * check whether we need to back up further to make logical > decoding > + * safe. We need to do so if we're computing the global limit > (rel = > + * NULL) or if the passed relation is a catalog relation of > some kind. > + */ > + if (TransactionIdIsValid(replication_slot_catalog_xmin) && > + > NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result)) > + result = replication_slot_catalog_xmin; > + } The nesting of these checks, and the comments about them, is a bit weird. > +/* > + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated > + * to reflect an advance in procArray->replication_slot_catalog_xmin or > + * it becoming newly set or unset. > + * > + */ > +static bool > +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId > slots_catalog_xmin) > +{ > + return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin) > + || (TransactionIdIsValid(vacuum_catalog_xmin) != > TransactionIdIsValid(slots_catalog_xmin))); > +} Your lines are really long - pgindent (which you really should run) will much this. I think it'd be better to rephrase this. > +/* > + * If necessary, copy the current catalog_xmin needed by repliation slots to Typo: repliation > + * the effective catalog_xmin used for dead tuple removal. > + * > + * When logical decoding is enabled we write a WAL record before advancing > the > + * effective value so that standbys find out if catalog tuples they still > need > + * get removed, and can properly cancel decoding sessions and invalidate > slots. > + * > + * The 'force' option is used when we're turning WAL_LEVEL_LOGICAL off > + * and need to clear the shmem state, since we want to bypass the wal_level > + * check and force xlog writing. > + */ > +void > +UpdateOldestCatalogXmin(bool force) I'm a bit confused by this function and variable name. What does + TransactionId oldestCatalogXmin; /* oldest xid where complete catalog state + * is guaranteed to still exist */ mean? I complained about the overall justification in the commit already, but looking at this commit alone, the justification for this part of the change is quite hard to understand. > +{ > + TransactionId vacuum_catalog_xmin; > + TransactionId slots_catalog_xmin; > + > + /* > + * If we're not recording logical decoding information, catalog_xmin > + * must be unset and we don't need to do any work here. If we don't need to do any work, shouldn't we return early? > + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin) || > force) > + { > + XactLogCatalogXminUpdate(slots_catalog_xmin); > + > + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); > + /* > + * A concurrent updater could've changed these values so we > need to re-check > + * under ProcArrayLock before updating. > + */ > + vacuum_catalog_xmin = *((volatile > TransactionId*)&ShmemVariableCache->oldestCatalogXmin); > + slots_catalog_xmin = *((volatile > TransactionId*)&procArray->replication_slot_catalog_xmin); why are there volatile reads here? > + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, > slots_catalog_xmin)) > + SetOldestCatalogXmin(slots_catalog_xmin); Why don't we check force here, but above? > @@ -2167,14 +2250,20 @@ GetOldestSafeDecodingTransactionId(void) > oldestSafeXid = ShmemVariableCache->nextXid; > > /* > - * If there's already a slot pegging the xmin horizon, we can start with > - * that value, it's guaranteed to be safe since it's computed by this > - * routine initially and has been enforced since. > + * If there's already an effectiveCatalogXmin held down by vacuum > + * it's definitely safe to start there, and it can't advance > + * while we hold ProcArrayLock. What does "held down by vacuum" mean? > /* > + * Notify a logical decoding session that it conflicts with a > + * newly set catalog_xmin from the master. > + */ > +void > +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid) > +{ > + ProcArrayStruct *arrayP = procArray; > + int index; > + > + /* > + * We have to scan ProcArray to find the process and set a pending > recovery > + * conflict even though we know the pid. At least we can get the > BackendId > + * and void a ProcSignal scan later. > + * > + * The pid might've gone away, in which case we got the desired > + * outcome anyway. > + */ > + LWLockAcquire(ProcArrayLock, LW_SHARED); > + > + for (index = 0; index < arrayP->numProcs; index++) > + { > + int pgprocno = arrayP->pgprocnos[index]; > + volatile PGPROC *proc = &allProcs[pgprocno]; > + > + if (proc->pid == session_pid) > + { > + VirtualTransactionId procvxid; > + > + GET_VXID_FROM_PGPROC(procvxid, *proc); > + > + proc->recoveryConflictPending = true; > + > + /* > + * Kill the pid if it's still here. If not, that's what > we > + * wanted so ignore any errors. > + */ > + (void) SendProcSignal(session_pid, > + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, > procvxid.backendId); > + > + break; > + } > + } > + > + LWLockRelease(ProcArrayLock); Doesn't seem ok to do this while holding ProcArrayLock. > +/* > + * Scan to see if any clients are using replication slots that are below the > + * new catalog_xmin theshold and sigal them to terminate with a recovery > + * conflict. > + * > + * We already applied the new catalog_xmin record and updated the shmem > + * catalog_xmin state, so new clients that try to use a replication slot > + * whose on-disk catalog_xmin is below the new threshold will ERROR, and we > + * don't have to guard against them here. > + * > + * Replay can only continue safely once every slot that needs the catalogs > + * we're going to free for removal is gone. So if any conflicting sessions > + * exist, wait for any standby conflict grace period then signal them to > exit. > + * > + * The master might clear its reserved catalog_xmin if all upstream slots are > + * removed or clear their feedback reservations, sending us > + * InvalidTransactionId. If we're concurrently trying to create a new slot > and > + * reserve catalogs the InvalidXid reservation report might come in while we > + * have a slot waiting for hs_feedback confirmation of its reservation. That > + * would cause the waiting process to get canceled with a conflict with > + * recovery here since its tentative reservation conflicts with the master's > + * report of 'nothing reserved'. To allow it to continue to seek a startpoint > + * we ignore slots whose catalog_xmin is >= nextXid, indicating that they're > + * still looking for where to start. We'll sometimes notice a conflict but > the > + * slot will advance its catalog_xmin to a more recent nextXid and cease to > + * conflict when we re-check. (The alternative is to track slots being > created > + * differently to slots actively decoding in shmem, which seems unnecessary. > Or > + * to separate the 'tentative catalog_xmin reservation' of a slot from its > + * actual needed catalog_xmin.) > + * > + * We can't use ResolveRecoveryConflictWithVirtualXIDs() here because > + * walsender-based logical decoding sessions won't have any virtualxid for > much > + * of their life and the end of their virtualxids doesn't mean the end of a > + * potential conflict. It would also cancel too aggressively, since it cares > + * about the backend's xmin and logical decoding only needs the catalog_xmin. > + */ The use of "we" seems confusing here, because it's not the same process. Generally I think your comments need to be edited a bit for brevity and preciseness. > +void > +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin) > +{ > + int i; > + > + if (!InHotStandby) > + /* nobody can be actively using logical slots */ > + return; > + > + /* Already applied new limit, can't have replayed later one yet */ > + Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin); > + > + /* > + * Find the first conflicting active slot and wait for it to be free, > + * signalling it if necessary, then repeat until there are no more > + * conflicts. > + */ > + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); > + for (i = 0; i < max_replication_slots; i++) > + { I'm pretty strongly against any code outside of slot.c doing this. > @@ -2789,12 +2797,13 @@ RecoveryConflictInterrupt(ProcSignalReason reason) > Assert(RecoveryConflictPending && (QueryCancelPending || > ProcDiePending)); > > /* > - * All conflicts apart from database cause dynamic errors where > the > - * command or transaction can be retried at a later point with > some > - * potential for success. No need to reset this, since > non-retryable > - * conflict errors are currently FATAL. > + * All conflicts apart from database and catalog_xmin cause > dynamic > + * errors where the command or transaction can be retried at a > later > + * point with some potential for success. No need to reset > this, since > + * non-retryable conflict errors are currently FATAL. > */ > - if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE) > + if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE || > + reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) > RecoveryConflictRetryable = false; > } Hm. Why is this a non-retryable error? Ok, landing soon. Gotta finish here. 0002 should be doable as a whole this release, I have severe doubts that 0003 as a whole has a chance for 10 - the code is in quite a raw shape, there's a significant number of open ends. I'd suggest breaking of bits that are independently useful, and work on getting those committed. - Andres -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers