Agreed. I wonder if I should simulate local Xactions by using local dblink calls? What do you think, Joe?
It is an interesting thought. Withing a single plpgsql function, open one local and one remote persistent, named dblink connection. Start a transaction in each. Go into your loop. Here's the problem -- I don't know how you can programmatically detect an error. Try playing with dblink_exec for this. If you can detect an error condition, you can then ABORT both transactions.
So, is it actually possible to use BEGIN; .. COMMIT; statement with dblink?
Sure. Use a named persistent connection. Then issue a BEGIN just like any other remote SQL statement (might be best to use dblink_exec with this also).
Even if I start the remote Xaction before the local one starts, there is no way for me to catch an exception thrown by the local Xaction. I don't think Pl/PgSQL supports exceptions. So, if the local Xaction throws an exception then the whole process terminates.
Ideas?
[runs off to try a few things...]
I played with this a bit, and found that with some minor changes to dblink_exec(), I can get the behavior we want, I think.
=============================================================== Here's the SQL: ===============================================================
\c remote drop table foo; create table foo(f1 int primary key, f2 text); insert into foo values (1,'a'); insert into foo values (2,'b'); insert into foo values (3,'b');
\c local drop table foo; create table foo(f1 int primary key, f2 text); --note this is missing on remote side create unique index uindx1 on foo(f2);
create or replace function test() returns text as '
declare
res text;
tup record;
sql text;
begin
-- leaving out result checking for clarity
select into res dblink_connect(''localconn'',''dbname=local'');
select into res dblink_connect(''remoteconn'',''dbname=remote'');
select into res dblink_exec(''localconn'',''BEGIN'');
select into res dblink_exec(''remoteconn'',''BEGIN'');for tup in select * from dblink(''remoteconn'',''select * from foo'')
as t(f1 int, f2 text) loop
sql := ''insert into foo values ('' || tup.f1::text || '','''''' || tup.f2 || '''''')'';
select into res dblink_exec(''localconn'',sql);
if res = ''ERROR'' then
select into res dblink_exec(''localconn'',''ABORT'');
select into res dblink_exec(''remoteconn'',''ABORT'');
select into res dblink_disconnect(''localconn'');
select into res dblink_disconnect(''remoteconn'');
return ''ERROR'';
else
sql := ''delete from foo where f1 = '' || tup.f1::text;
select into res dblink_exec(''remoteconn'',sql);
end if;
end loop;
select into res dblink_exec(''localconn'',''COMMIT'');
select into res dblink_exec(''remoteconn'',''COMMIT'');
select into res dblink_disconnect(''localconn'');
select into res dblink_disconnect(''remoteconn'');
return ''OK'';
end;
' language plpgsql;
=============================================================== Here's the test: =============================================================== local=# select test(); NOTICE: sql error DETAIL: ERROR: duplicate key violates unique constraint "uindx1"
CONTEXT: PL/pgSQL function "test" line 15 at select into variables test ------- ERROR (1 row)
local=# select * from foo; f1 | f2 ----+---- (0 rows)
local=# select * from dblink('dbname=remote','select * from foo') as t(f1 int, f2 text);
f1 | f2
----+----
1 | a
2 | b
3 | b
(3 rows)
local=# drop index uindx1; DROP INDEX local=# select test(); test ------ OK (1 row)
local=# select * from foo; f1 | f2 ----+---- 1 | a 2 | b 3 | b (3 rows)
local=# select * from dblink('dbname=remote','select * from foo') as t(f1 int, f2 text);
f1 | f2
----+----
(0 rows)
===============================================================
Patch attached. Thoughts?
Joe
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v
retrieving revision 1.29
diff -c -r1.29 dblink.c
*** contrib/dblink/dblink.c 28 Nov 2003 05:03:01 -0000 1.29
--- contrib/dblink/dblink.c 5 Feb 2004 19:49:00 -0000
***************
*** 135,140 ****
--- 135,150 ----
errmsg("%s", p2), \
errdetail("%s", msg))); \
} while (0)
+ #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
+ do { \
+ msg = pstrdup(PQerrorMessage(conn)); \
+ if (res) \
+ PQclear(res); \
+ ereport(NOTICE, \
+ (errcode(ERRCODE_SYNTAX_ERROR), \
+ errmsg("%s", p2), \
+ errdetail("%s", msg))); \
+ } while (0)
#define DBLINK_CONN_NOT_AVAIL \
do { \
if(conname) \
***************
*** 731,739 ****
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
! DBLINK_RES_ERROR("sql error");
! if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, false);
--- 741,762 ----
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
! {
! DBLINK_RES_ERROR_AS_NOTICE("sql error");
!
! /* need a tuple descriptor representing one TEXT column */
! tupdesc = CreateTemplateTupleDesc(1, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! TEXTOID, -1, 0, false);
! /*
! * and save a copy of the command status string to return as our
! * result tuple
! */
! sql_cmd_status = GET_TEXT("ERROR");
!
! }
! else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, false);
---------------------------(end of broadcast)--------------------------- TIP 5: Have you checked our extensive FAQ?
http://www.postgresql.org/docs/faqs/FAQ.html
