From 0e33687fdea32f26a0b809935e5155a0925dbdec Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Fri, 17 Jan 2025 15:20:52 +0100
Subject: Accept recovery conflict interrupt on blocked writing

Previously, all interrupts except process dying were ignored while a
process was blocked writing to a socket. If the connection to the client
was broken (no clean FIN nor RST), a process sending results to the
client could be stuck for 924s until the TCP retransmission timeout is
reached. During this time, it was possible for the process to conflict
with recovery: For example, the process returning results can have a
conflicting buffer pin.

To avoid blocking recovery for an extended period of time, this patch
changes client write interrupts by handling recovery conflict interrupts
instead of ignoring them. Since the interrupt happens while we're likely
to have partially written results on the socket, there's no easy way to
keep protocol sync so the session needs to be terminated.
---
 src/backend/tcop/postgres.c                  |  17 +++
 src/test/recovery/t/031_recovery_conflict.pl | 110 +++++++++++++++----
 2 files changed, 103 insertions(+), 24 deletions(-)

diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 5655348a2e2..b9373f9dd08 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -184,6 +184,7 @@ static void drop_unnamed_stmt(void);
 static void log_disconnections(int code, Datum arg);
 static void enable_statement_timeout(void);
 static void disable_statement_timeout(void);
+static void ProcessRecoveryConflictInterrupts(void);
 
 
 /* ----------------------------------------------------------------
@@ -581,6 +582,22 @@ ProcessClientWriteInterrupt(bool blocked)
 		else
 			SetLatch(MyLatch);
 	}
+	else if (blocked && RecoveryConflictPending)
+	{
+		/*
+		 * We're conflicting with recovery while being blocked writing. This
+		 * can happen when the process is returning results and no ACK is
+		 * received (broken connection, client overloaded...), eventually
+		 * saturating the socket buffer while the process holds a page pin
+		 * that eventually conflict with recovery.
+		 */
+		if (InterruptHoldoffCount == 0 && CritSectionCount == 0)
+		{
+			if (whereToSendOutput == DestRemote)
+				whereToSendOutput = DestNone;
+			ProcessRecoveryConflictInterrupts();
+		}
+	}
 
 	errno = save_errno;
 }
diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl
index 028b0b5f0e1..a30c942d35d 100644
--- a/src/test/recovery/t/031_recovery_conflict.pl
+++ b/src/test/recovery/t/031_recovery_conflict.pl
@@ -7,6 +7,7 @@
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::PgProto;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
@@ -74,25 +75,10 @@ my $expected_conflicts = 0;
 
 ## RECOVERY CONFLICT 1: Buffer pin conflict
 my $sect = "buffer pin conflict";
-$expected_conflicts++;
-
-# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old
-# enough so that there's not a snapshot conflict before the buffer pin
-# conflict.
-
-$node_primary->safe_psql(
-	$test_db,
-	qq[
-	BEGIN;
-	INSERT INTO $table1 VALUES (1,0);
-	ROLLBACK;
-	-- ensure flush, rollback doesn't do so
-	BEGIN; LOCK $table1; COMMIT;
-	]);
-
-$node_primary->wait_for_replay_catchup($node_standby);
-
 my $cursor1 = "test_recovery_conflict_cursor";
+$expected_conflicts++;
+
+setup_bufferpin_conflict();
 
 # DECLARE and use a cursor on standby, causing buffer with the only block of
 # the relation to be pinned on the standby
@@ -120,8 +106,56 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding shared buffer pin for too long");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("bufferpin");
+check_conflict_stat("bufferpin", 1);
 
+SKIP:
+{
+	skip "this test requires working raw_connect()"
+		unless $node_standby->raw_connect_works();
+
+	## RECOVERY CONFLICT 1 bis: Buffer pin conflict with conflicting query in ClientWrite
+	$sect = "buffer pin conflict (ClientWrite)";
+	$expected_conflicts++;
+
+	setup_bufferpin_conflict();
+
+	# The simplest way to get the user to use for the startup packet
+	# is to grab it from an existing psql session
+	my $user = $psql_standby->query_safe("SELECT current_user");
+
+	# Start the conflicting session
+	my $sock = $node_standby->raw_connect();
+	my $pgproto = PostgreSQL::Test::PgProto->new($sock);
+	my %parameters = ( user => $user, database => $test_db, application_name => $ENV{PGAPPNAME} );
+
+	$pgproto->send_startup_message(\%parameters);
+	# Read until Ready For Query message
+	$pgproto->read_until_message('Z');
+	my $pid = $pgproto->read_session_pid();
+
+	# We want the session to pin table1's block while staying in a ClientWrite
+	# state. To achieve that, we ask the server enough rows to saturate the
+	# buffer with the client not read those results.
+	$pgproto->send_simple_query(qq[
+			BEGIN;
+			DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
+			FETCH FORWARD FROM $cursor1;
+			SELECT generate_series(1, 100000);
+		]);
+
+	# Check that our session is in ClientWrite
+	check_session_wait_event($pid, 'ClientWrite');
+
+	# VACUUM FREEZE on the primary
+	$node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]);
+
+	check_conflict_log("User was holding shared buffer pin for too long");
+
+	# The conflicting session should be terminated, consume everything until the socket is closed.
+	$pgproto->wait_until_closed();
+
+	check_conflict_stat("bufferpin", 2);
+}
 
 ## RECOVERY CONFLICT 2: Snapshot conflict
 $sect = "snapshot conflict";
@@ -153,7 +187,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_conflict_log(
 	"User query might have needed to see row versions that must be removed");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("snapshot");
+check_conflict_stat("snapshot", 1);
 
 
 ## RECOVERY CONFLICT 3: Lock conflict
@@ -176,7 +210,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding a relation lock for too long");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("lock");
+check_conflict_stat("lock", 1);
 
 
 ## RECOVERY CONFLICT 4: Tablespace conflict
@@ -206,7 +240,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_conflict_log(
 	"User was or might have been using tablespace that must be dropped");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("tablespace");
+check_conflict_stat("tablespace", 1);
 
 
 ## RECOVERY CONFLICT 5: Deadlock
@@ -271,7 +305,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User transaction caused buffer deadlock with recovery.");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("deadlock");
+check_conflict_stat("deadlock", 1);
 
 # clean up for next tests
 $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]);
@@ -310,6 +344,22 @@ $node_primary->stop();
 
 done_testing();
 
+sub setup_bufferpin_conflict
+{
+	# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old
+	# enough so that there's not a snapshot conflict before the buffer pin
+	# conflict.
+	$node_primary->safe_psql($test_db,
+		qq[BEGIN;
+			INSERT INTO $table1 VALUES (1,0);
+			ROLLBACK;
+			-- ensure flush, rollback doesn't do so
+			BEGIN; LOCK $table1; COMMIT;]
+	);
+
+	$node_primary->wait_for_replay_catchup($node_standby);
+}
+
 sub check_conflict_log
 {
 	my $message = shift;
@@ -325,9 +375,21 @@ sub check_conflict_log
 sub check_conflict_stat
 {
 	my $conflict_type = shift;
+	my $expected_count = shift;
 	my $count = $node_standby->safe_psql($test_db,
 		qq[SELECT confl_$conflict_type FROM pg_stat_database_conflicts WHERE datname='$test_db';]
 	);
 
-	is($count, 1, "$sect: stats show conflict on standby");
+	is($count, $expected_count, "$sect: stats show $count conflicts on standby (expected $expected_count)");
+}
+
+sub check_session_wait_event
+{
+	my $pid = shift;
+	my $expected_wait_event = shift;
+	my $wait_event = $node_standby->safe_psql($test_db,
+		qq[SELECT wait_event FROM pg_stat_activity WHERE pid=$pid;]
+	);
+
+	is($wait_event, $expected_wait_event, "$sect: session with pid $pid has $wait_event wait_event");
 }
-- 
2.39.5 (Apple Git-154)

