On Thu, Aug 17, 2017 at 2:19 AM, Craig Ringer <[email protected]> wrote:
> On 17 August 2017 at 07:30, Michael Paquier <[email protected]>
> wrote:
>
>>
>> Definitely agreed on that. Any move function would need to check if
>> the WAL position given by caller is already newer than what's
>> available in the local pg_wal (minimum of all other slots), with a
>> shared lock that would need to be taken by xlog.c when recycling past
>> segments. A forward function works on a single entry, which should be
>> disabled at the moment of the update. It looks dangerous to me to do
>> such an operation if there is a consumer of a slot currently on it.
>>
>>
> pg_advance_replication_slot(...)
>
> ERROR's on logical slot, for now. Physical slots only.
>
> Forward-only.
>
> Future work to allow it to use the logical decoding infrastructure to
> fast-forward a slot by reading only catalog change information and
> invalidations, either via a dummy output plugin that filters out all xacts,
> or by lower level use of the decoding code.
>
> Reasonable?
>
>
PFA an updated and rebased patch.
Rebased. Now named pg_advance_replication_slot. ERROR on logical slots.
Forward only.
I think that, in the end, covered all the comments?
--
Magnus Hagander
Me: https://www.hagander.net/ <http://www.hagander.net/>
Work: https://www.redpill-linpro.com/ <http://www.redpill-linpro.com/>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 641b3b8f4e..452559f260 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19030,6 +19030,24 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
be called when connected to the same database the slot was created on.
</entry>
</row>
+ <row>
+ <entry>
+ <indexterm>
+ <primary>pg_advance_replication_slot</primary>
+ </indexterm>
+ <literal><function>pg_advance_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>position</parameter> <type>pg_lsn</type>)</function></literal>
+ </entry>
+ <entry>
+ <type>bool</type>
+ </entry>
+ <entry>
+ Advances the current restart position of a physical
+ replication slot named <parameter>slot_name</parameter>.
+ The slot will not be moved backwards, and it will not be
+ moved beyond the current insert location. Logical replication
+ slots cannot be moved. Returns true if the position was changed.
+ </entry>
+ </row>
<row>
<entry>
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d4cbd83bde..0e27e739bf 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -177,6 +177,73 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
}
/*
+ * SQL function for moving the position in a replication slot.
+ */
+Datum
+pg_advance_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name slotname = PG_GETARG_NAME(0);
+ XLogRecPtr moveto = PG_GETARG_LSN(1);
+ bool changed = false;
+ bool backwards = false;
+
+ Assert(!MyReplicationSlot);
+
+ check_permissions();
+
+ if (XLogRecPtrIsInvalid(moveto))
+ ereport(ERROR,
+ (errmsg("Invalid target xlog position ")));
+
+ /* Temporarily acquire the slot so we "own" it */
+ ReplicationSlotAcquire(NameStr(*slotname), true);
+
+ /* Only physical slots can be moved */
+ if (MyReplicationSlot->data.database != InvalidOid)
+ {
+ ReplicationSlotRelease();
+ ereport(ERROR,
+ (errmsg("Only physical replication slots can be advanced.")));
+ }
+
+ if (moveto > GetXLogWriteRecPtr())
+ /* Can't move past current position, so truncate there */
+ moveto = GetXLogWriteRecPtr();
+
+ /* Now adjust it */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ if (MyReplicationSlot->data.restart_lsn != moveto)
+ {
+ /* Never move backwards, because bad things can happen */
+ if (MyReplicationSlot->data.restart_lsn > moveto)
+ backwards = true;
+ else
+ {
+ MyReplicationSlot->data.restart_lsn = moveto;
+ changed = true;
+ }
+ }
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ if (backwards)
+ ereport(WARNING,
+ (errmsg("Not moving replication slot backwards!")));
+
+
+ if (changed)
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredLSN();
+ ReplicationSlotSave();
+ }
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_BOOL(changed);
+}
+
+
+/*
* pg_get_replication_slots - SQL SRF showing active replication slots.
*/
Datum
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 8b33b4e0ea..3495447cf9 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5293,6 +5293,8 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
+DATA(insert OID = 3998 ( pg_advance_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 16 "19 3220" _null_ _null_ _null_ _null_ _null_ pg_advance_replication_slot _null_ _null_ _null_ ));
+DESCR("move a replication slot position");
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers