From 0ce7edb1271e5f90b4c858515560e0f1e7dcea5a Mon Sep 17 00:00:00 2001
From: Dmitry Lebedev <idemonlebedev@gmail.com>
Date: Fri, 28 Nov 2025 15:59:51 +0500
Subject: [PATCH] Add log_object_drops GUC for DROP TABLE/DATABASE logging

New GUC logs DROP TABLE and DROP DATABASE with commit LSN.
Extends XactCallback to pass LSN. Handles subtransactions correctly.
Includes TAP tests.
---
 contrib/postgres_fdw/connection.c             |   4 +-
 contrib/sepgsql/label.c                       |   2 +-
 doc/src/sgml/config.sgml                      |  45 ++
 src/backend/access/transam/xact.c             |  31 +-
 src/backend/catalog/dependency.c              | 167 ++++++
 src/backend/commands/dbcommands.c             |  10 +
 src/backend/utils/misc/guc_parameters.dat     |   6 +
 src/backend/utils/misc/guc_tables.c           |   1 +
 src/include/access/xact.h                     |   2 +-
 src/include/utils/guc.h                       |   1 +
 src/pl/plpgsql/src/pl_exec.c                  |   2 +-
 src/pl/plpgsql/src/plpgsql.h                  |   2 +-
 src/test/recovery/meson.build                 |   3 +-
 src/test/recovery/t/049_drop_table_logging.pl | 543 ++++++++++++++++++
 14 files changed, 800 insertions(+), 19 deletions(-)
 create mode 100644 src/test/recovery/t/049_drop_table_logging.pl

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 953c2e0ab82..8673f5d19a7 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -145,7 +145,7 @@ static void do_sql_command_end(PGconn *conn, const char *sql,
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
 								  const char *sql);
-static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void pgfdw_xact_callback(XactEvent event, void *arg, XLogRecPtr lsn);
 static void pgfdw_subxact_callback(SubXactEvent event,
 								   SubTransactionId mySubid,
 								   SubTransactionId parentSubid,
@@ -1046,7 +1046,7 @@ pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
  * COMMIT TRANSACTION may run deferred triggers.)
  */
 static void
-pgfdw_xact_callback(XactEvent event, void *arg)
+pgfdw_xact_callback(XactEvent event, void *arg, XLogRecPtr lsn)
 {
 	HASH_SEQ_STATUS scan;
 	ConnCacheEntry *entry;
diff --git a/contrib/sepgsql/label.c b/contrib/sepgsql/label.c
index a37d89a3f1c..868ae7b8692 100644
--- a/contrib/sepgsql/label.c
+++ b/contrib/sepgsql/label.c
@@ -161,7 +161,7 @@ sepgsql_set_client_label(const char *new_label)
  * changes in the client_label_pending list.
  */
 static void
-sepgsql_xact_callback(XactEvent event, void *arg)
+sepgsql_xact_callback(XactEvent event, void *arg, XLogRecPtr lsn)
 {
 	if (event == XACT_EVENT_COMMIT)
 	{
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0a2a8b49fdb..a6aeda770a1 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -7671,6 +7671,51 @@ local0.*    /var/log/postgresql
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-log_object_drops" xreflabel="log_object_drops">
+      <term><varname>log_object_drops</varname> (<type>boolean</type>)
+      <indexterm>
+        <primary><varname>log_object_drops</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+      <para>
+        Causes <command>DROP TABLE</command> and <command>DROP DATABASE</command>
+        operations to be logged with their commit LSN (Log Sequence Number).
+        The commit LSN represents the point in the write-ahead log where the
+        drop operation becomes durable and visible to other transactions.
+      </para>
+
+      <para>
+        Each logged entry includes the schema name, table name, OID, and the
+        commit LSN. For example:
+        <programlisting>
+        LOG:  DROP TABLE: relation "public.employees" (OID 16384), LSN: 0/15D4A48
+        LOG:  DROP DATABASE: database "testdb" (OID 16385), LSN: 0/15D4B20
+        </programlisting>
+      </para>
+
+      <para>
+        Logged operations include: direct <command>DROP TABLE</command> statements,
+        tables dropped via <command>DROP SCHEMA CASCADE</command>,
+        partitioned tables and their partitions, tables in inheritance hierarchies,
+        and <command>DROP DATABASE</command> commands.
+        Operations that are rolled back (via <command>ROLLBACK</command> or
+        <command>ROLLBACK TO SAVEPOINT</command>) are not logged.
+      </para>
+
+      <para>
+        This parameter is useful for tracking and auditing destructive operations,
+        and for coordinating with external systems that monitor the write-ahead log.
+        Note that enabling this option may generate substantial log volume
+        when dropping schemas or databases containing many tables.
+      </para>
+
+      <para>
+        The default is <literal>off</literal>. Only superusers and users with
+        the appropriate <literal>SET</literal> privilege can change this setting.
+      </para>
+      </listitem>
+     </varlistentry>
 
      <varlistentry id="guc-log-duration" xreflabel="log_duration">
       <term><varname>log_duration</varname> (<type>boolean</type>)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2cf3d4e92b7..b8088be3593 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -338,7 +338,7 @@ static void AtCommit_Memory(void);
 static void AtStart_Cache(void);
 static void AtStart_Memory(void);
 static void AtStart_ResourceOwner(void);
-static void CallXactCallbacks(XactEvent event);
+static void CallXactCallbacks(XactEvent event, XLogRecPtr lsn);
 static void CallSubXactCallbacks(SubXactEvent event,
 								 SubTransactionId mySubid,
 								 SubTransactionId parentSubid);
@@ -1312,9 +1312,10 @@ AtSubStart_ResourceOwner(void)
  * If you change this function, see RecordTransactionCommitPrepared also.
  */
 static TransactionId
-RecordTransactionCommit(void)
+RecordTransactionCommit(XLogRecPtr *commit_lsn_out)
 {
 	TransactionId xid = GetTopTransactionIdIfAny();
+	XLogRecPtr commit_lsn = InvalidXLogRecPtr;
 	bool		markXidCommitted = TransactionIdIsValid(xid);
 	TransactionId latestXid = InvalidTransactionId;
 	int			nrels;
@@ -1451,7 +1452,7 @@ RecordTransactionCommit(void)
 		/*
 		 * Insert the commit XLOG record.
 		 */
-		XactLogCommitRecord(GetCurrentTransactionStopTimestamp(),
+		commit_lsn = XactLogCommitRecord(GetCurrentTransactionStopTimestamp(),
 							nchildren, children, nrels, rels,
 							ndroppedstats, droppedstats,
 							nmsgs, invalMessages,
@@ -1580,6 +1581,9 @@ cleanup:
 	if (ndroppedstats)
 		pfree(droppedstats);
 
+	if (commit_lsn_out)
+		*commit_lsn_out = commit_lsn;
+
 	return latestXid;
 }
 
@@ -2242,6 +2246,7 @@ CommitTransaction(void)
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
 	bool		is_parallel_worker;
+    XLogRecPtr commit_lsn = InvalidXLogRecPtr;
 
 	is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
 
@@ -2290,7 +2295,8 @@ CommitTransaction(void)
 	 */
 
 	CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
-					  : XACT_EVENT_PRE_COMMIT);
+					  : XACT_EVENT_PRE_COMMIT,
+					  InvalidXLogRecPtr);
 
 	/*
 	 * If this xact has started any unfinished parallel operation, clean up
@@ -2374,7 +2380,7 @@ CommitTransaction(void)
 		 * We need to mark our XIDs as committed in pg_xact.  This is where we
 		 * durably commit.
 		 */
-		latestXid = RecordTransactionCommit();
+		latestXid = RecordTransactionCommit(&commit_lsn);
 	}
 	else
 	{
@@ -2417,7 +2423,8 @@ CommitTransaction(void)
 	 */
 
 	CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT
-					  : XACT_EVENT_COMMIT);
+					  : XACT_EVENT_COMMIT,
+					  commit_lsn);
 
 	CurrentResourceOwner = NULL;
 	ResourceOwnerRelease(TopTransactionResourceOwner,
@@ -2565,7 +2572,7 @@ PrepareTransaction(void)
 			break;
 	}
 
-	CallXactCallbacks(XACT_EVENT_PRE_PREPARE);
+	CallXactCallbacks(XACT_EVENT_PRE_PREPARE, InvalidXLogRecPtr);
 
 	/*
 	 * The remaining actions cannot call any user-defined code, so it's safe
@@ -2725,7 +2732,7 @@ PrepareTransaction(void)
 	 * that cure could be worse than the disease.
 	 */
 
-	CallXactCallbacks(XACT_EVENT_PREPARE);
+	CallXactCallbacks(XACT_EVENT_PREPARE, InvalidXLogRecPtr);
 
 	ResourceOwnerRelease(TopTransactionResourceOwner,
 						 RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -2972,9 +2979,9 @@ AbortTransaction(void)
 	if (TopTransactionResourceOwner != NULL)
 	{
 		if (is_parallel_worker)
-			CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT);
+			CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT, InvalidXLogRecPtr);
 		else
-			CallXactCallbacks(XACT_EVENT_ABORT);
+			CallXactCallbacks(XACT_EVENT_ABORT, InvalidXLogRecPtr);
 
 		ResourceOwnerRelease(TopTransactionResourceOwner,
 							 RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -3847,7 +3854,7 @@ UnregisterXactCallback(XactCallback callback, void *arg)
 }
 
 static void
-CallXactCallbacks(XactEvent event)
+CallXactCallbacks(XactEvent event, XLogRecPtr lsn)
 {
 	XactCallbackItem *item;
 	XactCallbackItem *next;
@@ -3856,7 +3863,7 @@ CallXactCallbacks(XactEvent event)
 	{
 		/* allow callbacks to unregister themselves when called */
 		next = item->next;
-		item->callback(event, item->arg);
+		item->callback(event, item->arg, lsn);
 	}
 }
 
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 7dded634eb8..f98aa4f799b 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -18,6 +18,7 @@
 #include "access/htup_details.h"
 #include "access/table.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
 #include "catalog/heap.h"
@@ -65,6 +66,7 @@
 #include "catalog/pg_ts_template.h"
 #include "catalog/pg_type.h"
 #include "catalog/pg_user_mapping.h"
+#include "utils/guc.h"
 #include "commands/comment.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
@@ -176,6 +178,145 @@ static bool stack_address_present_add_flags(const ObjectAddress *object,
 											ObjectAddressStack *stack);
 static void DeleteInitPrivs(const ObjectAddress *object);
 
+/* Structure to hold DROP TABLE information */
+typedef struct DropTableInfo
+{
+	Oid			reloid;
+	char		relname[NAMEDATALEN];
+	char		schemaname[NAMEDATALEN];
+	SubTransactionId subxid;
+	bool valid;
+} DropTableInfo;
+
+/* Per-transaction list of dropped tables */
+static List *pending_drop_tables = NIL;
+static bool drop_table_callback_registered = false;
+
+static void DropTableXactCallback(XactEvent event, void *arg, XLogRecPtr lsn);
+static void DropTableSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
+									  SubTransactionId parentSubid, void *arg);
+
+/*
+ * Register a table drop for logging lsn.
+ */
+static void
+RegisterDropTable(Oid reloid, const char *relname, const char *schemaname)
+{
+	DropTableInfo *info;
+	MemoryContext oldcontext;
+
+	if (!drop_table_callback_registered)
+	{
+		RegisterXactCallback(DropTableXactCallback, NULL);
+		RegisterSubXactCallback(DropTableSubXactCallback, NULL);
+		drop_table_callback_registered = true;
+	}
+
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	info = (DropTableInfo *) palloc(sizeof(DropTableInfo));
+	info->reloid = reloid;
+	strlcpy(info->relname, relname, NAMEDATALEN);
+	strlcpy(info->schemaname, schemaname, NAMEDATALEN);
+	info->subxid = GetCurrentSubTransactionId();
+	info->valid = true;
+
+	pending_drop_tables = lappend(pending_drop_tables, info);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * SubXactCallback - handle ROLLBACK TO SAVEPOINT
+ */
+static void
+DropTableSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
+						 SubTransactionId parentSubid, void *arg)
+{
+	ListCell *lc;
+	MemoryContext oldcontext;
+
+	if (pending_drop_tables == NIL)
+        return;
+
+	/*
+	 * On subtransaction abort, remove all entries belonging to
+	 * the aborted subtransaction and its children.
+	 */
+	if (event == SUBXACT_EVENT_ABORT_SUB)
+	{
+		List *new_list = NIL;
+
+		/* Switch to TopTransactionContext for the new list */
+		oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+		foreach(lc, pending_drop_tables)
+		{
+			DropTableInfo *info = (DropTableInfo *) lfirst(lc);
+
+			/*
+			 * Mark entries that belong to our subtransactions.
+			 * SubTransactionIds are assigned incrementally, so we can
+			 * compare them.
+			 */
+			if (info->subxid >= mySubid)
+			{
+				info->valid = false;
+			}
+		}
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+}
+
+/*
+ * DropTableXactCallback
+ * Transaction callback to log commit LSN for DROP TABLE operations.
+ */
+static void
+DropTableXactCallback(XactEvent event, void *arg, XLogRecPtr commit_lsn)
+{
+	ListCell *lc;
+
+	if (pending_drop_tables == NIL)
+        return;
+
+	if (event == XACT_EVENT_COMMIT)
+	{
+		foreach(lc, pending_drop_tables)
+		{
+			DropTableInfo *info = (DropTableInfo *) lfirst(lc);
+			if (info->valid)
+			{
+				Assert(!XLogRecPtrIsInvalid(commit_lsn));
+
+				ereport(LOG,
+					(errmsg("DROP TABLE: relation \"%s.%s\" (OID %u), "
+							"drop LSN: %X/%X, commit LSN: %X/%X",
+							info->schemaname,
+							info->relname,
+							info->reloid,
+							LSN_FORMAT_ARGS(commit_lsn))));
+			}
+		}
+	}
+
+	/* Clean up after commit or abort */
+	if (event == XACT_EVENT_COMMIT ||
+		event == XACT_EVENT_ABORT ||
+		event == XACT_EVENT_PARALLEL_ABORT)
+	{
+		/* Free the DropTableInfo structures */
+		foreach(lc, pending_drop_tables)
+		{
+			DropTableInfo *info = (DropTableInfo *) lfirst(lc);
+			pfree(info);
+		}
+
+		list_free(pending_drop_tables);
+		pending_drop_tables = NIL;
+	}
+}
 
 /*
  * Go through the objects given running the final actions on them, and execute
@@ -1357,6 +1498,32 @@ doDeletion(const ObjectAddress *object, int flags)
 			{
 				char		relKind = get_rel_relkind(object->objectId);
 
+				/*
+				* Log all table drops that go through this function.
+				*/
+				if ((relKind == RELKIND_RELATION ||
+					relKind == RELKIND_PARTITIONED_TABLE)
+					&& log_object_drops)
+				{
+					char *relname = get_rel_name(object->objectId);
+
+					if (relname != NULL)
+					{
+						char *schemaname = NULL;
+						Oid schemaoid = get_rel_namespace(object->objectId);
+						if (OidIsValid(schemaoid))
+						{
+							schemaname = get_namespace_name(schemaoid);
+						}
+
+						RegisterDropTable(object->objectId, relname, schemaname ? schemaname : "unknown");
+
+						pfree(relname);
+						if (schemaname)
+							pfree(schemaname);
+					}
+				}
+
 				if (relKind == RELKIND_INDEX ||
 					relKind == RELKIND_PARTITIONED_INDEX)
 				{
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 4d65e8c46c2..e1c4b8d156e 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -29,6 +29,7 @@
 #include "access/multixact.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
@@ -1847,6 +1848,15 @@ dropdb(const char *dbname, bool missing_ok, bool force)
 	CatalogTupleDelete(pgdbrel, &tup->t_self);
 	heap_freetuple(tup);
 
+	/* Log LSN after database drop operation completes */
+	if (log_object_drops)
+	{
+		XLogRecPtr current_lsn = GetXLogInsertRecPtr();
+		ereport(LOG,
+				(errmsg("DROP DATABASE: database \"%s\" (OID %u), LSN: %X/%X",
+						dbname, db_id, LSN_FORMAT_ARGS(current_lsn))));
+	}
+
 	/*
 	 * Drop db-specific replication slots.
 	 */
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index d6fc8333850..9317196bb18 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3500,5 +3500,11 @@
   options => 'io_method_options',
   assign_hook => 'assign_io_method',
 },
+{ name => 'log_object_drops', type => 'bool', context => 'PGC_SUSET', group => 'LOGGING_WHAT',
+  short_desc => 'Logs LSN for DROP TABLE and DROP DATABASE operations.',
+  long_desc => 'When enabled, the system will log the LSN (Log Sequence Number) whenever a DROP TABLE or DROP DATABASE command is executed.',
+  variable => 'log_object_drops',
+  boot_val => 'false',
+},
 
 ]
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 00c8376cf4d..c64f772eb06 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -517,6 +517,7 @@ bool		Debug_write_read_parse_plan_trees;
 bool		Debug_raw_expression_coverage_test;
 #endif
 
+bool		log_object_drops = false;
 bool		log_parser_stats = false;
 bool		log_planner_stats = false;
 bool		log_executor_stats = false;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 4528e51829e..5951aa7c7da 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -136,7 +136,7 @@ typedef enum
 	XACT_EVENT_PRE_PREPARE,
 } XactEvent;
 
-typedef void (*XactCallback) (XactEvent event, void *arg);
+typedef void (*XactCallback) (XactEvent event, void *arg, XLogRecPtr lsn);
 
 typedef enum
 {
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index f21ec37da89..bf722074d89 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -303,6 +303,7 @@ extern PGDLLIMPORT int log_temp_files;
 extern PGDLLIMPORT double log_statement_sample_rate;
 extern PGDLLIMPORT double log_xact_sample_rate;
 extern PGDLLIMPORT char *backtrace_functions;
+extern PGDLLIMPORT bool log_object_drops;
 
 extern PGDLLIMPORT int temp_file_limit;
 
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d19425b7a71..14a94645438 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -8721,7 +8721,7 @@ plpgsql_destroy_econtext(PLpgSQL_execstate *estate)
  * it has to be cleaned up.  The same for the simple-expression resowner.
  */
 void
-plpgsql_xact_cb(XactEvent event, void *arg)
+plpgsql_xact_cb(XactEvent event, void *arg, XLogRecPtr lsn)
 {
 	/*
 	 * If we are doing a clean transaction shutdown, free the EState and tell
diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h
index 5f193a37183..2e493408f3a 100644
--- a/src/pl/plpgsql/src/plpgsql.h
+++ b/src/pl/plpgsql/src/plpgsql.h
@@ -1266,7 +1266,7 @@ extern HeapTuple plpgsql_exec_trigger(PLpgSQL_function *func,
 									  TriggerData *trigdata);
 extern void plpgsql_exec_event_trigger(PLpgSQL_function *func,
 									   EventTriggerData *trigdata);
-extern void plpgsql_xact_cb(XactEvent event, void *arg);
+extern void plpgsql_xact_cb(XactEvent event, void *arg, XLogRecPtr lsn);
 extern void plpgsql_subxact_cb(SubXactEvent event, SubTransactionId mySubid,
 							   SubTransactionId parentSubid, void *arg);
 extern PGDLLEXPORT Oid plpgsql_exec_get_datum_type(PLpgSQL_execstate *estate,
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 52993c32dbb..027e956a9bf 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -56,7 +56,8 @@ tests += {
       't/045_archive_restartpoint.pl',
       't/046_checkpoint_logical_slot.pl',
       't/047_checkpoint_physical_slot.pl',
-      't/048_vacuum_horizon_floor.pl'
+      't/048_vacuum_horizon_floor.pl',
+      't/049_drop_table_logging.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/049_drop_table_logging.pl b/src/test/recovery/t/049_drop_table_logging.pl
new file mode 100644
index 00000000000..5cb3d3307a8
--- /dev/null
+++ b/src/test/recovery/t/049_drop_table_logging.pl
@@ -0,0 +1,543 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test DROP TABLE logging functionality
+# This test verifies that DROP TABLE operations are logged with correct LSN values
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize node with logging to stderr (not logging collector)
+my $node = PostgreSQL::Test::Cluster->new('primary');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+log_min_messages = log
+logging_collector = off
+log_destination = 'stderr'
+log_object_drops = on
+});
+$node->start;
+
+# Track log file position for incremental reading
+my $log_offset = 0;
+
+# Cache for current test - stores log content read once per test
+my $current_test_log_cache = undef;
+
+# Reset to start a new test - clears cache and updates offset
+sub start_new_test
+{
+	my ($test_name) = @_;
+	note($test_name) if defined $test_name;
+
+	$current_test_log_cache = undef;
+
+	# Update offset to current position
+	my $logfile = $node->logfile;
+	$log_offset = -s $logfile;
+}
+
+# Get log content for current test (cached within test)
+sub get_test_log_content
+{
+	return $current_test_log_cache if defined $current_test_log_cache;
+
+	# Read new content since last test started
+	my $logfile = $node->logfile;
+	my $current_size = -s $logfile;
+
+	# If offset is beyond file size, reset to 0
+	$log_offset = 0 if $log_offset > $current_size;
+
+	# Read only new content
+	$current_test_log_cache = slurp_file($logfile, $log_offset);
+
+	return $current_test_log_cache;
+}
+
+# Count matching log entries in current test
+sub count_drop_logs
+{
+	my ($pattern) = @_;
+	my $log = get_test_log_content();
+	my @matches = $log =~ /$pattern/g;
+	return scalar @matches;
+}
+
+# Get matching log lines in current test
+sub get_log_lines
+{
+	my ($pattern) = @_;
+	my $log = get_test_log_content();
+	my @lines = split /\n/, $log;
+	my @matching_lines = grep { /$pattern/ } @lines;
+	return @matching_lines;
+}
+
+# Helper function to extract LSN from log line
+sub extract_lsn
+{
+	my ($line, $lsn_type) = @_;
+	$lsn_type //= 'single';  # Default to single LSN format
+
+	if ($lsn_type eq 'single')
+	{
+		# For single LSN: "LSN: 0/12345"
+		if ($line =~ /LSN: ([0-9A-F]+\/[0-9A-F]+)/)
+		{
+			return $1;
+		}
+	}
+	elsif ($lsn_type eq 'commit')
+	{
+		# For commit LSN in future extended format
+		if ($line =~ /commit LSN: ([0-9A-F]+\/[0-9A-F]+)/)
+		{
+			return $1;
+		}
+	}
+	return undef;
+}
+
+# Helper function to compare LSNs
+sub lsn_less_than
+{
+	my ($lsn1, $lsn2) = @_;
+	my ($seg1, $off1) = split /\//, $lsn1;
+	my ($seg2, $off2) = split /\//, $lsn2;
+	return (hex($seg1) < hex($seg2)) ||
+	       (hex($seg1) == hex($seg2) && hex($off1) < hex($off2));
+}
+
+# Build pattern for DROP TABLE log entry
+sub drop_table_pattern
+{
+	my ($schema, $table) = @_;
+	return qr/DROP TABLE: relation "$schema\.$table"/;
+}
+
+# Build pattern for DROP DATABASE log entry
+sub drop_database_pattern
+{
+	my ($dbname) = @_;
+	return qr/DROP DATABASE: database "$dbname"/;
+}
+
+# Test helper: Execute SQL and verify drop count
+sub test_drop_count
+{
+	my ($test_name, $sql, $pattern, $expected_count) = @_;
+
+	start_new_test($test_name);
+	$node->safe_psql('postgres', $sql);
+
+	my $count = count_drop_logs($pattern);
+	is($count, $expected_count, "$test_name: count check");
+}
+
+# Test helper: Execute SQL and verify log entry exists with valid LSN
+sub test_drop_logged
+{
+	my ($test_name, $sql, $pattern, $extra_checks) = @_;
+
+	start_new_test($test_name);
+	$node->safe_psql('postgres', $sql);
+
+	my @log_lines = get_log_lines($pattern);
+	is(scalar @log_lines, 1, "$test_name: entry logged");
+
+	if (@log_lines)
+	{
+		like($log_lines[0], qr/LSN: [0-9A-F]+\/[0-9A-F]+/,
+		     "$test_name: LSN present");
+		unlike($log_lines[0], qr/LSN: 0\/0/,
+		       "$test_name: LSN not invalid");
+
+		# Execute additional checks if provided
+		$extra_checks->($log_lines[0]) if defined $extra_checks;
+	}
+}
+
+# Test helper: Execute SQL and verify nothing was logged
+sub test_drop_not_logged
+{
+	my ($test_name, $sql, $pattern) = @_;
+
+	start_new_test($test_name);
+	$node->safe_psql('postgres', $sql);
+
+	my $count = count_drop_logs($pattern);
+	is($count, 0, "$test_name: not logged");
+}
+
+# Test helper: Verify multiple drops in one transaction
+sub test_multiple_drops
+{
+	my ($test_name, $sql, @table_specs) = @_;
+
+	start_new_test($test_name);
+	$node->safe_psql('postgres', $sql);
+
+	foreach my $spec (@table_specs)
+	{
+		my ($schema, $table, $expected) = @$spec;
+		my $count = count_drop_logs(drop_table_pattern($schema, $table));
+		is($count, $expected, "$test_name: $schema.$table");
+	}
+}
+
+# ==============================================================================
+# TESTS START HERE
+# ==============================================================================
+
+# Test 1: Simple DROP TABLE (autocommit)
+test_drop_logged(
+	'Test 1: Simple DROP TABLE in autocommit mode',
+	q{
+		CREATE TABLE test_simple (id int);
+		INSERT INTO test_simple VALUES (1);
+		DROP TABLE test_simple;
+	},
+	drop_table_pattern('public', 'test_simple')
+);
+
+# Test 2: DROP TABLE inside transaction
+test_drop_logged(
+	'Test 2: DROP TABLE inside explicit transaction',
+	q{
+		BEGIN;
+		CREATE TABLE test_in_xact (id int);
+		INSERT INTO test_in_xact VALUES (1);
+		DROP TABLE test_in_xact;
+		COMMIT;
+	},
+	drop_table_pattern('public', 'test_in_xact')
+);
+
+# Test 3: DROP TABLE with ROLLBACK
+test_drop_not_logged(
+	'Test 3: DROP TABLE with ROLLBACK - should not be logged',
+	q{
+		CREATE TABLE test_rollback (id int);
+		INSERT INTO test_rollback VALUES (1);
+		BEGIN;
+		DROP TABLE test_rollback;
+		ROLLBACK;
+	},
+	drop_table_pattern('public', 'test_rollback')
+);
+
+# Now actually drop the table
+test_drop_count(
+	'Test 3b: Committed DROP logged',
+	q{
+		SELECT * FROM test_rollback;
+		DROP TABLE test_rollback;
+	},
+	drop_table_pattern('public', 'test_rollback'),
+	1
+);
+
+# Test 4: DROP SCHEMA CASCADE
+test_multiple_drops(
+	'Test 4: DROP SCHEMA CASCADE - all tables logged',
+	q{
+		CREATE SCHEMA test_schema;
+		CREATE TABLE test_schema.table1 (id int);
+		CREATE TABLE test_schema.table2 (name text);
+		INSERT INTO test_schema.table1 VALUES (1);
+		INSERT INTO test_schema.table2 VALUES ('test');
+		BEGIN;
+		DROP SCHEMA test_schema CASCADE;
+		COMMIT;
+	},
+	['test_schema', 'table1', 1],
+	['test_schema', 'table2', 1]
+);
+
+# Test 5: DROP TABLE with FK CASCADE
+test_drop_count(
+	'Test 5: DROP TABLE CASCADE with foreign keys',
+	q{
+		CREATE TABLE test_parent (id int PRIMARY KEY);
+		CREATE TABLE test_child (id int, parent_id int REFERENCES test_parent(id));
+		INSERT INTO test_parent VALUES (1);
+		INSERT INTO test_child VALUES (1, 1);
+		BEGIN;
+		DROP TABLE test_parent CASCADE;
+		COMMIT;
+	},
+	drop_table_pattern('public', 'test_parent'),
+	1
+);
+
+# Test 6: Multiple DROP TABLE in single statement
+test_multiple_drops(
+	'Test 6: Multiple tables in single DROP statement',
+	q{
+		CREATE TABLE test_multi1 (id int);
+		CREATE TABLE test_multi2 (id int);
+		CREATE TABLE test_multi3 (id int);
+		BEGIN;
+		DROP TABLE test_multi1, test_multi2, test_multi3;
+		COMMIT;
+	},
+	['public', 'test_multi1', 1],
+	['public', 'test_multi2', 1],
+	['public', 'test_multi3', 1]
+);
+
+# Test 7: DROP PARTITIONED TABLE
+test_multiple_drops(
+	'Test 7: Partitioned table and partitions',
+	q{
+		CREATE TABLE test_partitioned (id int, created_at date) PARTITION BY RANGE (created_at);
+		CREATE TABLE test_part_2024 PARTITION OF test_partitioned
+		    FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
+		CREATE TABLE test_part_2025 PARTITION OF test_partitioned
+		    FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
+		BEGIN;
+		DROP TABLE test_partitioned CASCADE;
+		COMMIT;
+	},
+	['public', 'test_partitioned', 1],
+	['public', 'test_part_2024', 1],
+	['public', 'test_part_2025', 1]
+);
+
+# Test 8: Mixed operations in one transaction
+test_multiple_drops(
+	'Test 8: Mixed CREATE and DROP operations in transaction',
+	q{
+		CREATE SCHEMA mixed_schema;
+		BEGIN;
+		CREATE TABLE mixed_schema.table1 (id int);
+		INSERT INTO mixed_schema.table1 VALUES (1);
+		CREATE TABLE mixed_schema.table2 (id int);
+		DROP TABLE mixed_schema.table2;
+		CREATE TABLE outside_table (id int);
+		INSERT INTO outside_table VALUES (1);
+		DROP SCHEMA mixed_schema CASCADE;
+		DROP TABLE outside_table;
+		COMMIT;
+	},
+	['mixed_schema', 'table1', 1],
+	['mixed_schema', 'table2', 1],
+	['public', 'outside_table', 1]
+);
+
+# Test 9: DROP temporary table
+test_drop_count(
+	'Test 9: Temporary table',
+	q{
+		CREATE TEMP TABLE test_temp (id int);
+		INSERT INTO test_temp VALUES (1);
+		BEGIN;
+		DROP TABLE test_temp;
+		COMMIT;
+	},
+	qr/DROP TABLE: relation "pg_temp_\d+\.test_temp"/,
+	1
+);
+
+# Test 10: DROP VIEW (should not be logged)
+start_new_test('Test 10: Views should not be logged');
+$node->safe_psql('postgres', q{
+	CREATE TABLE test_view_base (id int);
+	CREATE VIEW test_view AS SELECT * FROM test_view_base;
+	DROP VIEW test_view;
+	DROP TABLE test_view_base;
+});
+
+my $view_count = count_drop_logs(qr/DROP TABLE: relation "public\.test_view"[^\w]/);
+my $view_base_count = count_drop_logs(drop_table_pattern('public', 'test_view_base'));
+is($view_count, 0, 'Test 10: VIEW drop not logged');
+is($view_base_count, 1, 'Test 10: Base table drop logged');
+
+# Test 11: DROP INDEX (should not be logged)
+start_new_test('Test 11: Indexes should not be logged');
+$node->safe_psql('postgres', q{
+	CREATE TABLE test_index_table (id int);
+	CREATE INDEX test_idx ON test_index_table(id);
+	DROP INDEX test_idx;
+	DROP TABLE test_index_table;
+});
+
+my $index_count = count_drop_logs(qr/DROP TABLE: relation "public\.test_idx"/);
+my $index_table_count = count_drop_logs(drop_table_pattern('public', 'test_index_table'));
+is($index_count, 0, 'Test 11: INDEX drop not logged');
+is($index_table_count, 1, 'Test 11: Table drop logged');
+
+# Test 12: Table inheritance hierarchy
+test_multiple_drops(
+	'Test 12: Inheritance hierarchy',
+	q{
+		CREATE TABLE parent_inherit (id int);
+		CREATE TABLE child_inherit1 () INHERITS (parent_inherit);
+		CREATE TABLE child_inherit2 () INHERITS (parent_inherit);
+		INSERT INTO parent_inherit VALUES (1);
+		INSERT INTO child_inherit1 VALUES (2);
+		INSERT INTO child_inherit2 VALUES (3);
+		BEGIN;
+		DROP TABLE parent_inherit CASCADE;
+		COMMIT;
+	},
+	['public', 'parent_inherit', 1],
+	['public', 'child_inherit1', 1],
+	['public', 'child_inherit2', 1]
+);
+
+# Test 13: Nested transaction with SAVEPOINT
+start_new_test('Test 13: SAVEPOINT and ROLLBACK TO');
+$node->safe_psql('postgres', q{
+	CREATE TABLE test_savepoint1 (id int);
+	CREATE TABLE test_savepoint2 (id int);
+	CREATE TABLE test_savepoint3 (id int);
+	BEGIN;
+	DROP TABLE test_savepoint1;
+	SAVEPOINT sp1;
+	DROP TABLE test_savepoint2;
+	ROLLBACK TO sp1;
+	DROP TABLE test_savepoint3;
+	COMMIT;
+});
+
+my $sp1_count = count_drop_logs(drop_table_pattern('public', 'test_savepoint1'));
+my $sp2_count = count_drop_logs(drop_table_pattern('public', 'test_savepoint2'));
+my $sp3_count = count_drop_logs(drop_table_pattern('public', 'test_savepoint3'));
+is($sp1_count, 1, 'Test 13: savepoint1 logged');
+is($sp2_count, 0, 'Test 13: savepoint2 NOT logged (rolled back)');
+is($sp3_count, 1, 'Test 13: savepoint3 logged');
+
+# Test 14: COMMIT AND CHAIN
+start_new_test('Test 14: COMMIT AND CHAIN with multiple cycles');
+$node->safe_psql('postgres', q{
+	CREATE TABLE chain_test1 (id int);
+	CREATE TABLE chain_test2 (id int);
+	CREATE TABLE chain_test3 (id int);
+	CREATE TABLE chain_test4 (id int);
+	BEGIN;
+	DROP TABLE chain_test1;
+	INSERT INTO chain_test2 VALUES (1);
+	DROP TABLE chain_test2;
+	COMMIT AND CHAIN;
+	DROP TABLE chain_test3;
+	COMMIT AND CHAIN;
+	DROP TABLE chain_test4;
+	COMMIT;
+});
+
+my @chain_logs = get_log_lines(qr/DROP TABLE: relation "public\.chain_test[1-4]/);
+is(scalar @chain_logs, 4, 'Test 14: Four COMMIT AND CHAIN drops logged');
+
+# Test 15: ROLLBACK AND CHAIN
+start_new_test('Test 15: ROLLBACK AND CHAIN');
+$node->safe_psql('postgres', q{
+	CREATE TABLE rollback_chain1 (id int);
+	CREATE TABLE rollback_chain2 (id int);
+	CREATE TABLE rollback_chain3 (id int);
+	BEGIN;
+	DROP TABLE rollback_chain1;
+	ROLLBACK AND CHAIN;
+	DROP TABLE rollback_chain2;
+	COMMIT AND CHAIN;
+	DROP TABLE rollback_chain3;
+	COMMIT;
+});
+
+my $rb_chain1 = count_drop_logs(drop_table_pattern('public', 'rollback_chain1'));
+my $rb_chain2 = count_drop_logs(drop_table_pattern('public', 'rollback_chain2'));
+my $rb_chain3 = count_drop_logs(drop_table_pattern('public', 'rollback_chain3'));
+is($rb_chain1, 0, 'Test 15: rollback_chain1 NOT logged (rolled back)');
+is($rb_chain2, 1, 'Test 15: rollback_chain2 logged');
+is($rb_chain3, 1, 'Test 15: rollback_chain3 logged');
+
+# Test 16: COMMIT AND CHAIN with SAVEPOINTs
+start_new_test('Test 16: COMMIT AND CHAIN combined with SAVEPOINTs');
+$node->safe_psql('postgres', q{
+	CREATE TABLE chain_sp1 (id int);
+	CREATE TABLE chain_sp2 (id int);
+	CREATE TABLE chain_sp3 (id int);
+	CREATE TABLE chain_sp4 (id int);
+	BEGIN;
+	DROP TABLE chain_sp1;
+	SAVEPOINT sp1;
+	DROP TABLE chain_sp2;
+	ROLLBACK TO sp1;
+	DROP TABLE chain_sp3;
+	COMMIT AND CHAIN;
+	DROP TABLE chain_sp4;
+	COMMIT;
+});
+
+my $csp1 = count_drop_logs(drop_table_pattern('public', 'chain_sp1'));
+my $csp2 = count_drop_logs(drop_table_pattern('public', 'chain_sp2'));
+my $csp3 = count_drop_logs(drop_table_pattern('public', 'chain_sp3'));
+my $csp4 = count_drop_logs(drop_table_pattern('public', 'chain_sp4'));
+is($csp1, 1, 'Test 16: chain_sp1 logged');
+is($csp2, 0, 'Test 16: chain_sp2 NOT logged (rolled back)');
+is($csp3, 1, 'Test 16: chain_sp3 logged');
+is($csp4, 1, 'Test 16: chain_sp4 logged');
+
+# Test 17: Multiple COMMIT AND CHAIN cycles
+start_new_test('Test 17: Five consecutive COMMIT AND CHAIN operations');
+$node->safe_psql('postgres', q{
+	CREATE TABLE cycle1 (id int);
+	CREATE TABLE cycle2 (id int);
+	CREATE TABLE cycle3 (id int);
+	CREATE TABLE cycle4 (id int);
+	CREATE TABLE cycle5 (id int);
+	BEGIN;
+	DROP TABLE cycle1;
+	COMMIT AND CHAIN;
+	DROP TABLE cycle2;
+	COMMIT AND CHAIN;
+	DROP TABLE cycle3;
+	COMMIT AND CHAIN;
+	DROP TABLE cycle4;
+	COMMIT AND CHAIN;
+	DROP TABLE cycle5;
+	COMMIT;
+});
+
+my @cycle_logs = get_log_lines(qr/DROP TABLE: relation "public\.cycle\d"/);
+is(scalar @cycle_logs, 5, 'Test 17: Five cycle drops logged');
+
+# Test 18: DROP DATABASE
+test_drop_logged(
+	'Test 18: DROP DATABASE',
+	q{
+		CREATE DATABASE test_drop_db;
+		DROP DATABASE test_drop_db;
+	},
+	drop_database_pattern('test_drop_db'),
+	sub {
+		my ($line) = @_;
+		like($line, qr/LSN: [0-9A-F]+\/[0-9A-F]+/,
+		     'Test 18: DROP DATABASE has single LSN');
+	}
+);
+
+# Test 19: PL/pgSQL function with EXCEPTION block (subtransaction)
+test_drop_logged(
+	'Test 19: DROP in PL/pgSQL with EXCEPTION handler',
+	q{
+		CREATE TABLE plpgsql_test (id int);
+
+		CREATE FUNCTION test_drop_with_exception() RETURNS void AS $$
+		BEGIN
+		    DROP TABLE plpgsql_test;
+		EXCEPTION
+		    WHEN OTHERS THEN
+		        RAISE NOTICE 'Exception caught';
+		END;
+		$$ LANGUAGE plpgsql;
+
+		SELECT test_drop_with_exception();
+	},
+	drop_table_pattern('public', 'plpgsql_test')
+);
+
+done_testing();
-- 
2.51.1

