Hello!

Added a TAP test for this case.

On 30.08.2022 10:09, Anton A. Melnikov wrote:
Hello!

The patch was rebased on current master.
And here is a simplified crash reproduction:
1) On primary with 'wal_level = logical' execute:
  CREATE TABLE public.test (id int NOT NULL, val integer);
  CREATE PUBLICATION test_pub FOR TABLE test;

2) On replica replace XXXX in the repcmd.sql  attached with primary port and 
execute it:
psql -f repcmd.sql

3) On master execute command:
INSERT INTO test VALUES ('1');

With best regards,

--
Anton A. Melnikov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
commit d539e1e36ef7af33e1a89eaee00183739c716797
Author: Anton A. Melnikov <a.melni...@postgrespro.ru>
Date:   Sun Aug 21 18:27:44 2022 +0300

    Fix logical replica crash if there was an error in a function.

diff --git a/src/backend/catalog/pg_proc.c b/src/backend/catalog/pg_proc.c
index a9fe45e347..1381fae575 100644
--- a/src/backend/catalog/pg_proc.c
+++ b/src/backend/catalog/pg_proc.c
@@ -1007,8 +1007,9 @@ sql_function_parse_error_callback(void *arg)
  * anonymous-block handler, not only for SQL-language functions.
  * It is assumed that the syntax error position is initially relative to the
  * function body string (as passed in).  If possible, we adjust the position
- * to reference the original command text; if we can't manage that, we set
- * up an "internal query" syntax error instead.
+ * to reference the original command text; if we can't manage that or
+ * can't get the original command text when ActivePortal is not defined,
+ * we set up an "internal query" syntax error instead.
  *
  * Returns true if a syntax error was processed, false if not.
  */
@@ -1016,7 +1017,7 @@ bool
 function_parse_error_transpose(const char *prosrc)
 {
 	int			origerrposition;
-	int			newerrposition;
+	int			newerrposition = 0;
 	const char *queryText;
 
 	/*
@@ -1034,12 +1035,15 @@ function_parse_error_transpose(const char *prosrc)
 			return false;
 	}
 
-	/* We can get the original query text from the active portal (hack...) */
-	Assert(ActivePortal && ActivePortal->status == PORTAL_ACTIVE);
-	queryText = ActivePortal->sourceText;
+	if (ActivePortal)
+	{
+		/* We can get the original query text from the active portal (hack...) */
+		Assert(ActivePortal->status == PORTAL_ACTIVE);
+		queryText = ActivePortal->sourceText;
 
-	/* Try to locate the prosrc in the original text */
-	newerrposition = match_prosrc_to_query(prosrc, queryText, origerrposition);
+		/* Try to locate the prosrc in the original text */
+		newerrposition = match_prosrc_to_query(prosrc, queryText, origerrposition);
+	}
 
 	if (newerrposition > 0)
 	{
@@ -1052,7 +1056,8 @@ function_parse_error_transpose(const char *prosrc)
 	else
 	{
 		/*
-		 * If unsuccessful, convert the position to an internal position
+		 * If unsuccessful or ActivePortal not defined and original command
+		 * text is unreachable, convert the position to an internal position
 		 * marker and give the function text as the internal query.
 		 */
 		errposition(0);
diff --git a/src/test/recovery/t/034_logical_replica_on_error.pl b/src/test/recovery/t/034_logical_replica_on_error.pl
new file mode 100644
index 0000000000..380ad74590
--- /dev/null
+++ b/src/test/recovery/t/034_logical_replica_on_error.pl
@@ -0,0 +1,105 @@
+# Copyright (c) 2022, Postgres Professional
+
+# There was an assertion if function text contains an error. See PGPRO-7025
+# Тhis file has a prefix (120_) to prevent prefix collision with
+# upstream test files.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 2;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->append_conf(
+	'postgresql.conf', qq(
+wal_level = logical
+));
+$node_primary->start;
+
+$node_primary->safe_psql('postgres',
+	'CREATE TABLE public.test (id int NOT NULL, val integer);');
+
+$node_primary->safe_psql('postgres',
+	'CREATE PUBLICATION test_pub FOR TABLE test;');
+
+# Initialize logical replica node
+my $node_replica = PostgreSQL::Test::Cluster->new('replica');
+$node_replica->init(has_streaming => 1,
+	has_restoring => 1);
+$node_replica->start;
+
+$node_replica->safe_psql('postgres',
+	'CREATE TABLE test (id int NOT NULL, val integer);');
+
+$node_replica->safe_psql('postgres', q{
+	create or replace procedure rebuild_test(
+	) as
+	$body$
+	declare
+		l_code  text:= E'create or replace function public.test_selector(
+	) returns setof public.test as
+	\$body\$
+		select * from  error_name
+	\$body\$ language sql;';
+	begin
+		execute l_code;
+		perform public.test_selector();
+	end
+	$body$ language plpgsql;
+});
+
+$node_replica->safe_psql('postgres', '
+	create or replace function test_trg()
+	returns trigger as
+	$body$
+		declare
+		begin
+			call public.rebuild_test();
+			return NULL;
+		end
+	$body$ language plpgsql;
+');
+
+$node_replica->safe_psql('postgres',
+	'create trigger test_trigger after insert or update or delete on test for each row execute function test_trg();');
+
+$node_replica->safe_psql('postgres',
+	'alter table test enable replica trigger test_trigger;');
+
+my $primary_port   = $node_primary->port;
+my $conn_params = "port=" . $primary_port . " dbname=postgres";
+
+my ($stdin, $stdout, $stderr) = $node_replica->psql('postgres',
+	qq[CREATE SUBSCRIPTION test_sub CONNECTION \'$conn_params\' PUBLICATION test_pub;]
+);
+
+ok($stderr =~ /NOTICE:  created replication slot \"test_sub\" on publisher/,
+	"Logical decoding correctly fails on function error");
+
+my $worker_pid =
+	  $node_replica->safe_psql('postgres', q{
+		select pid from pg_stat_subscription where subname = 'test_sub' limit 1;
+	});
+
+$node_primary->safe_psql('postgres', q{
+	INSERT INTO test VALUES ('1');
+	});
+
+$node_replica->poll_query_until('postgres', qq[
+		select $worker_pid != any(array_agg(pid)) as ready
+			from pg_stat_subscription where subname = \'test_sub\';
+	])
+  or die "Timed out while waiting for new WAL with error from primary";
+
+$node_replica->stop;
+$node_primary->stop;
+
+my $log = $node_replica->logfile();
+
+my $log_contents = slurp_file($log);
+
+ok($log_contents =~ /ERROR:  relation \"error_name\" does not exist at character/,
+	"Logical decoding correctly fails on function error");

Attachment: repcmd.sql
Description: application/sql

Reply via email to