Hi,

while messing around with slot code I noticed that the SQL functions for
consuming/moving logical replication slots only move restart_lsn up to
previously consumed position and not to currently consumed position. The
reason for that is that restart_lsn is not moved forward unless new
value is smaller that current confirmed_lsn of the slot. But we only
update confirmed_lsn of the slot at the end of the SQL functions so we
can only move restart_lsn up to the position we reached on previous
call. Same is true for catalog_xmin.

This does not really hurt much functionality wise but it means that
every record is needlessly processed twice as we always restart from
position that was reached 2 calls of the function ago and that we keep
older catalog_xmin than necessary which can potentially affect system
catalog bloat.

This affects both the  pg_logical_slot_get_[binary_]changes and
pg_replication_slot_advance.

Attached patch improves things by adding call to move the slot's
restart_lsn and catalog_xmin to the last serialized snapshot position
right after we update the confirmed_lsn.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 14b3c32bdde1add1842da4805179b795e5a2a778 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Sun, 30 Dec 2018 19:57:19 +0100
Subject: [PATCH] Move restart_lsn and catalog_xmin more agresivelly in SQL
 slot functions

The restart_lsn and catalog_xmin are normally moved by snapshot builder
and are limited to the position set as confirmed_lsn by replication
feedback. However, since the SQL functions like
pg_logical_slot_get_changes or pg_replication_slot_advance only update
the confirmed_lsn at the end of the function, the restart_lsn and
catlog_xmin are only moved to the location of previous call. This in
practice means that each record is processed twice when these functions
are called repeatadly as they restart decoding from approximately the
lsn reached two call ago. It also means we keep catalog_xmin pinned back
for longer than necessary.

This patch adds update call to the end of the affected SQL functions
right after the confirmed_lsn is updated which improves situation
considerably.
---
 src/backend/replication/logical/logicalfuncs.c | 10 ++++++++++
 src/backend/replication/logical/snapbuild.c    |  6 ++++++
 src/backend/replication/slotfuncs.c            | 12 +++++++++++-
 src/include/replication/snapbuild.h            |  1 +
 4 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 45aae71a49..e077e869db 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -45,6 +45,7 @@
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
 #include "replication/message.h"
+#include "replication/snapbuild.h"
 
 #include "storage/fd.h"
 
@@ -331,8 +332,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		 */
 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
 		{
+			XLogRecPtr		snapshot_lsn;
+			TransactionId	xmin;
+
 			LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
 
+			xmin = ReorderBufferGetOldestXmin(ctx->reorder);
+			snapshot_lsn = SnapBuildGetSnapshotLsn(ctx->snapshot_builder);
+			LogicalIncreaseXminForSlot(snapshot_lsn, xmin);
+			LogicalIncreaseRestartDecodingForSlot(snapshot_lsn,
+												  snapshot_lsn);
+
 			/*
 			 * If only the confirmed_flush_lsn has changed the slot won't get
 			 * marked as dirty by the above. Callers on the walsender
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 363ddf4505..529e3287a3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -687,6 +687,12 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
 	return builder->snapshot;
 }
 
+XLogRecPtr
+SnapBuildGetSnapshotLsn(SnapBuild *builder)
+{
+	return builder->last_serialized_snapshot;
+}
+
 /*
  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
  * any. Aborts the previously started transaction and resets the resource
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad4a2..e819310c0d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -21,6 +21,7 @@
 #include "replication/slot.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/snapbuild.h"
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/pg_lsn.h"
@@ -428,7 +429,16 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 
 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
 		{
-			LogicalConfirmReceivedLocation(moveto);
+			XLogRecPtr		snapshot_lsn;
+			TransactionId	xmin;
+
+			LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
+
+			xmin = ReorderBufferGetOldestXmin(ctx->reorder);
+			snapshot_lsn = SnapBuildGetSnapshotLsn(ctx->snapshot_builder);
+			LogicalIncreaseXminForSlot(snapshot_lsn, xmin);
+			LogicalIncreaseRestartDecodingForSlot(snapshot_lsn,
+												  snapshot_lsn);
 
 			/*
 			 * If only the confirmed_flush LSN has changed the slot won't get
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 56257430ae..e5a5daca92 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -73,6 +73,7 @@ extern void SnapBuildClearExportedSnapshot(void);
 extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
 extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 							TransactionId xid);
+extern XLogRecPtr SnapBuildGetSnapshotLsn(SnapBuild *builder);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 
-- 
2.17.1

Reply via email to