From d2438ef88dc6295d8f670f02188e01e38518f49a Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Tue, 14 Oct 2025 15:03:38 +0530
Subject: [PATCH v1] Support large object decoding

Introduce support for decoding changes to large objects in
logical replication.  Changes to 'pg_largeobject' are now
intercepted in 'heap_decode' based on LargeObjectRelationId.
Since a single large object operation (LO_WRITE) spans multiple
physical rows in 'pg_largeobject', the changes are decoded and
converted into a dedicated logical operation:
REORDER_BUFFER_CHANGE_LOWRITE.
---
 contrib/test_decoding/Makefile                |   2 +-
 .../expected/decoding_largeobject.out         | 216 ++++++++++++++++++
 .../sql/decoding_largeobject.sql              |  94 ++++++++
 contrib/test_decoding/test_decoding.c         |  39 ++++
 src/backend/replication/logical/decode.c      | 133 +++++++++++
 src/backend/replication/logical/proto.c       |  41 ++++
 .../replication/logical/reorderbuffer.c       |  61 +++++
 src/backend/replication/pgoutput/pgoutput.c   |  23 ++
 src/include/replication/logicalproto.h        |   5 +
 src/include/replication/reorderbuffer.h       |  12 +
 src/include/utils/rel.h                       |   6 +-
 11 files changed, 630 insertions(+), 2 deletions(-)
 create mode 100644 contrib/test_decoding/expected/decoding_largeobject.out
 create mode 100644 contrib/test_decoding/sql/decoding_largeobject.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index acbcaed2feb..d1f02500cf3 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream decoding_largeobject
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot \
diff --git a/contrib/test_decoding/expected/decoding_largeobject.out b/contrib/test_decoding/expected/decoding_largeobject.out
new file mode 100644
index 00000000000..a2720d82064
--- /dev/null
+++ b/contrib/test_decoding/expected/decoding_largeobject.out
@@ -0,0 +1,216 @@
+-- test that we can insert into the large objects and decode the changes
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+-- Create a new large object
+CREATE TABLE lotest_stash_values (loid oid, fd integer);
+INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42);
+-- NOTE: large objects require transactions
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values;
+ lowrite 
+---------
+      22
+(1 row)
+
+SELECT lo_close(fd) FROM lotest_stash_values;
+ lo_close 
+----------
+        0
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: INSERT: loid[oid]:16970 fd[integer]:null
+ COMMIT
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 0 datalen: 22 data: large object test data
+ COMMIT
+(7 rows)
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values;
+ lo_lseek 
+----------
+       10
+(1 row)
+
+SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values;
+ lowrite 
+---------
+      19
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                      data                                       
+---------------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 0 datalen: 29 data: large objeoverwrite some data
+ COMMIT
+(4 rows)
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values;
+ lo_lseek 
+----------
+     2048
+(1 row)
+
+SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values;
+ lowrite 
+---------
+      19
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                   data                                   
+--------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 2048 datalen: 19 data: write into new page
+ COMMIT
+(4 rows)
+
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, '
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+') FROM lotest_stash_values;
+ lowrite 
+---------
+    3829
+(1 row)
+
+SELECT lo_close(fd) FROM lotest_stash_values;
+ lo_close 
+----------
+        0
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                              data                                               
+-------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 0 datalen: 2048 data:                                            +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data m
+ LO_WRITE: loid: 16970 offset: 2048 datalen: 1781 data: ore in 2048 test large data more in 2048+
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ 
+ COMMIT
+(5 rows)
+
+-- Clean up the slot
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/decoding_largeobject.sql b/contrib/test_decoding/sql/decoding_largeobject.sql
new file mode 100644
index 00000000000..ff392de2fba
--- /dev/null
+++ b/contrib/test_decoding/sql/decoding_largeobject.sql
@@ -0,0 +1,94 @@
+-- test that we can insert into the large objects and decode the changes
+
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Create a new large object
+CREATE TABLE lotest_stash_values (loid oid, fd integer);
+
+INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42);
+
+-- NOTE: large objects require transactions
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values;
+SELECT lo_close(fd) FROM lotest_stash_values;
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values;
+SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values;
+END;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values;
+SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values;
+END;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, '
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+') FROM lotest_stash_values;
+SELECT lo_close(fd) FROM lotest_stash_values;
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+
+-- Clean up the slot
+SELECT pg_drop_replication_slot('regression_slot');
\ No newline at end of file
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 36e77c69e1c..d34a73666f4 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -470,6 +470,38 @@ pg_decode_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static void
+pg_decode_lo_write(LogicalDecodingContext *ctx,
+				   ReorderBufferChange *change)
+{
+	TestDecodingData   *data;
+	MemoryContext		old;
+	Oid					loid = change->data.lo_write.loid;
+	int64				offset = change->data.lo_write.offset;
+	Size				datalen = change->data.lo_write.datalen;
+	char			   *lodata = change->data.lo_write.data;
+
+	data = ctx->output_plugin_private;
+
+	/* Avoid leaking memory by using and resetting our own context */
+	old = MemoryContextSwitchTo(data->context);
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "LO_WRITE:");
+	appendStringInfo(ctx->out, " loid: %u offset: " INT64_FORMAT " datalen: %zu data: ",
+					 loid, offset, datalen);
+
+	appendBinaryStringInfo(ctx->out, lodata, datalen);
+
+	/* For test_decoding, we print the data length but typically skip the binary data itself */
+
+	MemoryContextSwitchTo(old);
+	MemoryContextReset(data->context);
+
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
@@ -619,6 +651,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	txndata->xact_wrote_changes = true;
 
+	/* Handle large object changes independent of the table changes. */
+	if (change->action == REORDER_BUFFER_CHANGE_LOWRITE)
+	{
+		pg_decode_lo_write(ctx, change);
+		return;
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..d948df84065 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -26,6 +26,7 @@
  */
 #include "postgres.h"
 
+#include "access/detoast.h"
 #include "access/heapam_xlog.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -33,11 +34,13 @@
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
 #include "catalog/pg_control.h"
+#include "catalog/pg_largeobject.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/message.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
+#include "storage/large_object.h"
 #include "storage/standbydefs.h"
 
 /* individual record(group)'s handlers */
@@ -56,6 +59,10 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						bool two_phase);
 static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						  xl_xact_parsed_prepare *parsed);
+static void DecodeLargeObjectInsert(LogicalDecodingContext *ctx,
+									XLogRecordBuffer *buf);
+static void DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx,
+									 XLogRecordBuffer *buf);
 
 
 /* common function to decode tuples */
@@ -471,6 +478,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
 	SnapBuild  *builder = ctx->snapshot_builder;
+	RelFileLocator target_locator;
+	XLogReaderState *r = buf->record;
 
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
@@ -485,6 +494,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+
+	/*
+	 * Check if the WAL record pertains to 'pg_largeobject'. If it does,
+	 * handle the large object changes separately via
+	 * DecodeLargeObjectChanges, bypassing the standard heap table decoding
+	 * logic that follows.
+	 */
+	if (target_locator.relNumber == LargeObjectRelationId)
+	{
+		if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+			!ctx->fast_forward)
+			DecodeLargeObjectChanges(info, ctx, buf);
+		return;
+	}
+
 	switch (info)
 	{
 		case XLOG_HEAP_INSERT:
@@ -1323,3 +1348,111 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	return false;
 }
+
+/*
+ * Helper function to decode a 'pg_largeobject' INSERT record into a
+ * 'REORDER_BUFFER_CHANGE_LOWRITE' change.
+ *
+ * Each row in 'pg_largeobject' represents only a small page (or chunk) of
+ * a large object's data. Logically, these individual page-level inserts
+ * are not meaningful on their own to a consumer. Therefore, instead of
+ * treating them as regular heap tuple changes, we convert the physical
+ * page insert into a single, more meaningful logical operation: a
+ * 'LO_WRITE' change, which can be applied as an independent large object
+ * operation.
+ */
+static void
+DecodeLargeObjectInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	ReorderBufferChange *change;
+	Size		datalen;
+	char		*tupledata;
+	HeapTuple	tuple;
+	bytea	   *data_chunk;
+	Oid			loid;
+	int32 		pageno;
+	int64		offset;
+	Size		chunk_datalen;
+	char	   *data_copy;
+	bool		freeit = false;
+	Form_pg_largeobject	largeobject;
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+	if (datalen == 0)
+		return;
+
+	tuple = ReorderBufferAllocTupleBuf(ctx->reorder, datalen - SizeOfHeapHeader);
+	DecodeXLogTuple(tupledata, datalen, tuple);
+	largeobject = GETSTRUCT(tuple);
+
+	/* Fetch loid, pageno and actual data from the pg_largeobject tuple. */
+	loid = largeobject->loid;
+	pageno = largeobject->pageno;
+	data_chunk = &(largeobject->data);
+	if (VARATT_IS_EXTENDED(data_chunk))
+	{
+		data_chunk = (bytea *)
+			detoast_attr((struct varlena *) data_chunk);
+		freeit = true;
+	}
+	chunk_datalen = VARSIZE(data_chunk) - VARHDRSZ;
+
+	/*
+	 * Convert the single 'pg_largeobject' row (which represents a data page)
+	 * into a logical 'LOWRITE' operation. The absolute offset for this write
+	 * is computed by multiplying the page number ('pageno') by the fixed
+	 * large object block size (LOBLKSIZE).
+	 */
+	offset = (int64) pageno * LOBLKSIZE;
+	//chunk_datalen = VARSIZE_ANY_EXHDR(data_chunk);
+	data_copy = ReorderBufferAllocRawBuffer(ctx->reorder, chunk_datalen);
+	memcpy(data_copy, VARDATA(data_chunk), chunk_datalen);
+
+
+	/* Create the LOWRITE change */
+	change = ReorderBufferAllocChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_LOWRITE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	change->data.lo_write.loid = loid;
+	change->data.lo_write.offset = offset;
+	change->data.lo_write.datalen = chunk_datalen;
+	change->data.lo_write.data = data_copy;
+
+	/* Enqueue the change */
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+	ReorderBufferFreeTupleBuf(tuple);
+	if (freeit)
+		pfree(data_chunk);
+}
+
+/*
+ * Processes and decodes all logical changes for large objects (LOs).
+ * Since LO data is spread across 'pg_largeobject' rows, this function
+ * maps physical changes (INSERT/UPDATE) to a single logical 'LO_WRITE'
+ * operation.
+ *
+ * TODO: Temporarily ignoring LO_UNLINK (DELETE), which will be
+ * handled during a later phase.
+ */
+static void
+DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx,
+						 XLogRecordBuffer *buf)
+{
+	switch (info)
+	{
+		case XLOG_HEAP_INSERT:
+		case XLOG_HEAP_HOT_UPDATE:
+		case XLOG_HEAP_UPDATE:
+			DecodeLargeObjectInsert(ctx, buf);
+			break;
+		case XLOG_HEAP_DELETE:
+			/* LO_UNLINK (delete) is handled in a later phase */
+			break;
+		default:
+			/* Ignore other operations on pg_largeobject for now */
+			break;
+	}
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f0a913892b9..68d1a4306e4 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -346,6 +346,45 @@ logicalrep_read_rollback_prepared(StringInfo in,
 	strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
 }
 
+void
+logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid,
+						  int64 offset, Size datalen, const char *data)
+{
+	pq_sendbyte(out, LOGICAL_REP_MSG_LOWRITE);
+
+	/* Write LO ID, offset, and data length */
+	pq_sendint32(out, loid);
+	pq_sendint64(out, offset);
+	pq_sendint32(out, datalen);
+
+	/* Write the data chunk */
+	pq_sendbytes(out, data, datalen);
+}
+
+void
+logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen,
+						 char **data)
+{
+	/* Read fields, incorporating validation */
+	*loid = pq_getmsgint(s, 4);
+	if (!OidIsValid(*loid))
+		elog(ERROR, "large object ID is not set in LO write message");
+
+	*offset = pq_getmsgint64(s);
+	if (*offset < 0)
+		elog(ERROR, "invalid offset " INT64_FORMAT " in LO write message", *offset);
+
+	*datalen = pq_getmsgint(s, 4);
+	if (*datalen < 0)
+		elog(ERROR, "invalid data length %zu in LO write message", *datalen);
+
+	/* Allocate memory for the data payload */
+	*data = palloc(*datalen);
+
+	/* Read the data payload directly into the new buffer */
+	pq_copymsgbytes(s, *data, *datalen);
+}
+
 /*
  * Write STREAM PREPARE to the output stream.
  */
@@ -1235,6 +1274,8 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "TYPE";
 		case LOGICAL_REP_MSG_MESSAGE:
 			return "MESSAGE";
+		case LOGICAL_REP_MSG_LOWRITE:
+			return "LOWRITE";
 		case LOGICAL_REP_MSG_BEGIN_PREPARE:
 			return "BEGIN PREPARE";
 		case LOGICAL_REP_MSG_PREPARE:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index eb6a84554b7..aeb381b9d8e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -579,6 +579,13 @@ ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			if (change->data.lo_write.data != NULL)
+			{
+				pfree(change->data.lo_write.data);
+				change->data.lo_write.data = NULL;
+			}
+			break;
 	}
 
 	pfree(change);
@@ -930,6 +937,19 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * Allocate a raw memory from reorder buffer.
+ */
+void *
+ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len)
+{
+	void *buffer;
+
+	buffer = (char *) MemoryContextAlloc(rb->tup_context, alloc_len);
+
+	return buffer;
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -2585,6 +2605,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+
+				case REORDER_BUFFER_CHANGE_LOWRITE:
+					ReorderBufferApplyChange(rb, txn, NULL, change, streaming);
+					break;
 			}
 
 			/*
@@ -4270,6 +4294,26 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			{
+				char   *data;
+				Size	datalen = change->data.lo_write.datalen;
+
+				sz += datalen;
+
+				/* make sure we have enough space */
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				/* Copy the LO_WRITE struct and the data payload immediately following it */
+				memcpy(data, &change->data.lo_write.data, datalen);
+
+				break;
+			}
 	}
 
 	ondisk->size = sz;
@@ -4534,6 +4578,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			{
+				sz += change->data.lo_write.datalen;
+				break;
+			}
 	}
 
 	return sz;
@@ -4833,6 +4882,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			{
+				Size        datalen = change->data.lo_write.datalen;
+
+				/* Allocate memory for the data payload */
+				change->data.lo_write.data = MemoryContextAlloc(rb->context, datalen);
+
+				/* Copy the data payload */
+				memcpy(change->data.lo_write.data, data, datalen);
+
+				break;
+			}
 	}
 
 	dlist_push_tail(&txn->changes, &change->node);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 942e1abdb58..0b126005ab4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1491,6 +1491,29 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
+	if (change->action == REORDER_BUFFER_CHANGE_LOWRITE)
+	{
+		if (txndata && !txndata->sent_begin_txn)
+			pgoutput_send_begin(ctx, txn);
+
+		/* Remember the xid for the change in streaming mode. */
+		if (data->in_streaming)
+			xid = change->txn->xid;
+
+		OutputPluginPrepareWrite(ctx, true);
+
+		/* Use the new helper to serialize the LO payload */
+		logicalrep_write_lo_write(ctx->out, xid,
+								  change->data.lo_write.loid,
+								  change->data.lo_write.offset,
+								  change->data.lo_write.datalen,
+								  change->data.lo_write.data);
+
+		OutputPluginWrite(ctx, true);
+
+		return;
+	}
+
 	if (!is_publishable_relation(relation))
 		return;
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..dc4ad3898c8 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -66,6 +66,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_LOWRITE = 'W',
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -214,6 +215,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN
 											   TimestampTz prepare_time);
 extern void logicalrep_read_rollback_prepared(StringInfo in,
 											  LogicalRepRollbackPreparedTxnData *rollback_data);
+extern void logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid,
+									  int64 offset, Size datalen, const char *data);
+extern void logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen,
+									 char **data);
 extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
 											XLogRecPtr prepare_lsn);
 extern void logicalrep_read_stream_prepare(StringInfo in,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3cbe106a3c7..031a83fa6d5 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -61,6 +61,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
 	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_LOWRITE,
 } ReorderBufferChangeType;
 
 /* forward declaration */
@@ -154,6 +155,16 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
+
+		/* Lo write */
+		struct
+		{
+			Oid		loid;
+			int64	offset;
+			Size	datalen;
+			char   *data;
+		}			lo_write;
+
 	}			data;
 
 	/*
@@ -722,6 +733,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+extern void *ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len);
 extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 80286076a11..d8303612d14 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -19,6 +19,7 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_publication.h"
 #include "nodes/bitmapset.h"
 #include "partitioning/partdefs.h"
@@ -707,12 +708,15 @@ RelationCloseSmgr(Relation relation)
  * it would complicate decoding slightly for little gain). Note that we *do*
  * log information for user defined catalog tables since they presumably are
  * interesting to the user...
+ *
+ * TODO: Logically log pg_largeobject rows with a configuration parameter
+ * instead of doing it unconditionally.
  */
 #define RelationIsLogicallyLogged(relation) \
 	(XLogLogicalInfoActive() && \
 	 RelationNeedsWAL(relation) && \
 	 (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&	\
-	 !IsCatalogRelation(relation))
+	 !(IsCatalogRelation(relation) && RelationGetRelid(relation) != LargeObjectRelationId))
 
 /* routines in utils/cache/relcache.c */
 extern void RelationIncrementReferenceCount(Relation rel);
-- 
2.49.0

