Oleg Lebedev wrote:
Agreed. I wonder if I should simulate local Xactions by using local
dblink calls?
What do you think, Joe?

It is an interesting thought. Withing a single plpgsql function, open one local and one remote persistent, named dblink connection. Start a transaction in each. Go into your loop. Here's the problem -- I don't know how you can programmatically detect an error. Try playing with dblink_exec for this. If you can detect an error condition, you can then ABORT both transactions.


So, is it actually possible to use BEGIN; .. COMMIT; statement with
dblink?

Sure. Use a named persistent connection. Then issue a BEGIN just like any other remote SQL statement (might be best to use dblink_exec with this also).


Even if I start the remote Xaction before the local one starts, there is
no way for me to catch an exception thrown by the local Xaction. I don't
think Pl/PgSQL supports exceptions. So, if the local Xaction throws an
exception then the whole process terminates.

Ideas?

[runs off to try a few things...]


I played with this a bit, and found that with some minor changes to dblink_exec(), I can get the behavior we want, I think.

===============================================================
Here's the SQL:
===============================================================

\c remote
drop table foo;
create table foo(f1 int primary key, f2 text);
insert into foo values (1,'a');
insert into foo values (2,'b');
insert into foo values (3,'b');

\c local
drop table foo;
create table foo(f1 int primary key, f2 text);
--note this is missing on remote side
create unique index uindx1 on foo(f2);

create or replace function test() returns text as '
declare
 res text;
 tup record;
 sql text;
begin
 -- leaving out result checking for clarity
 select into res dblink_connect(''localconn'',''dbname=local'');
 select into res dblink_connect(''remoteconn'',''dbname=remote'');
 select into res dblink_exec(''localconn'',''BEGIN'');
 select into res dblink_exec(''remoteconn'',''BEGIN'');

for tup in select * from dblink(''remoteconn'',''select * from foo'')
as t(f1 int, f2 text) loop
sql := ''insert into foo values ('' || tup.f1::text || '','''''' || tup.f2 || '''''')'';
select into res dblink_exec(''localconn'',sql);
if res = ''ERROR'' then
select into res dblink_exec(''localconn'',''ABORT'');
select into res dblink_exec(''remoteconn'',''ABORT'');
select into res dblink_disconnect(''localconn'');
select into res dblink_disconnect(''remoteconn'');
return ''ERROR'';
else
sql := ''delete from foo where f1 = '' || tup.f1::text;
select into res dblink_exec(''remoteconn'',sql);
end if;
end loop;
select into res dblink_exec(''localconn'',''COMMIT'');
select into res dblink_exec(''remoteconn'',''COMMIT'');
select into res dblink_disconnect(''localconn'');
select into res dblink_disconnect(''remoteconn'');
return ''OK'';
end;
' language plpgsql;



=============================================================== Here's the test: =============================================================== local=# select test(); NOTICE: sql error DETAIL: ERROR: duplicate key violates unique constraint "uindx1"

CONTEXT:  PL/pgSQL function "test" line 15 at select into variables
 test
-------
 ERROR
(1 row)

local=# select * from foo;
 f1 | f2
----+----
(0 rows)

local=# select * from dblink('dbname=remote','select * from foo') as t(f1 int, f2 text);
f1 | f2
----+----
1 | a
2 | b
3 | b
(3 rows)


local=# drop index uindx1;
DROP INDEX
local=# select test();
 test
------
 OK
(1 row)

local=# select * from foo;
 f1 | f2
----+----
  1 | a
  2 | b
  3 | b
(3 rows)

local=# select * from dblink('dbname=remote','select * from foo') as t(f1 int, f2 text);
f1 | f2
----+----
(0 rows)


===============================================================

Patch attached. Thoughts?

Joe




Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v
retrieving revision 1.29
diff -c -r1.29 dblink.c
*** contrib/dblink/dblink.c     28 Nov 2003 05:03:01 -0000      1.29
--- contrib/dblink/dblink.c     5 Feb 2004 19:49:00 -0000
***************
*** 135,140 ****
--- 135,150 ----
                                         errmsg("%s", p2), \
                                         errdetail("%s", msg))); \
        } while (0)
+ #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
+       do { \
+                       msg = pstrdup(PQerrorMessage(conn)); \
+                       if (res) \
+                               PQclear(res); \
+                       ereport(NOTICE, \
+                                       (errcode(ERRCODE_SYNTAX_ERROR), \
+                                        errmsg("%s", p2), \
+                                        errdetail("%s", msg))); \
+       } while (0)
  #define DBLINK_CONN_NOT_AVAIL \
        do { \
                if(conname) \
***************
*** 731,739 ****
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
                 PQresultStatus(res) != PGRES_TUPLES_OK))
!               DBLINK_RES_ERROR("sql error");
  
!       if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
                /* need a tuple descriptor representing one TEXT column */
                tupdesc = CreateTemplateTupleDesc(1, false);
--- 741,762 ----
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
                 PQresultStatus(res) != PGRES_TUPLES_OK))
!       {
!               DBLINK_RES_ERROR_AS_NOTICE("sql error");
! 
!               /* need a tuple descriptor representing one TEXT column */
!               tupdesc = CreateTemplateTupleDesc(1, false);
!               TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                                                  TEXTOID, -1, 0, false);
  
!               /*
!                * and save a copy of the command status string to return as our
!                * result tuple
!                */
!               sql_cmd_status = GET_TEXT("ERROR");
! 
!       }
!       else if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
                /* need a tuple descriptor representing one TEXT column */
                tupdesc = CreateTemplateTupleDesc(1, false);
---------------------------(end of broadcast)---------------------------
TIP 5: Have you checked our extensive FAQ?

               http://www.postgresql.org/docs/faqs/FAQ.html

Reply via email to