Bruce Momjian wrote:
No problem -- thanks. I have slimmed down the patch by applying the
cosmetic parts to CVS. Use the URL above to get the newest versions of
the dblink.c and regression changes.
Here is my counter-proposal to Bruce's dblink patch. Any comments?
Is it too late to apply this for 8.1? I tend to agree with calling this
a bugfix.
Thanks,
Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.47
diff -c -r1.47 dblink.c
*** dblink.c 15 Oct 2005 02:49:04 -0000 1.47
--- dblink.c 16 Oct 2005 02:04:13 -0000
***************
*** 60,68 ****
typedef struct remoteConn
{
! PGconn *conn; /* Hold the remote connection */
! int autoXactCursors;/* Indicates the number of open cursors,
! * non-zero means we opened the xact ourselves */
} remoteConn;
/*
--- 60,68 ----
typedef struct remoteConn
{
! PGconn *conn; /* Hold the remote connection */
! int openCursorCount; /* The number of open cursors */
! bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
/*
***************
*** 84,93 ****
static char *generate_relation_name(Oid relid);
/* Global */
! List *res_id = NIL;
! int res_id_index = 0;
! PGconn *persistent_conn = NULL;
! static HTAB *remoteConnHash = NULL;
/*
* Following is list that holds multiple remote connections.
--- 84,91 ----
static char *generate_relation_name(Oid relid);
/* Global */
! static remoteConn *pconn = NULL;
! static HTAB *remoteConnHash = NULL;
/*
* Following is list that holds multiple remote connections.
***************
*** 184,189 ****
--- 182,197 ----
} \
} while (0)
+ #define DBLINK_INIT \
+ do { \
+ if (!pconn) \
+ { \
+ pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+ pconn->conn = NULL; \
+ pconn->openCursorCount = 0; \
+ pconn->newXactForCursor = FALSE; \
+ } \
+ } while (0)
/*
* Create a persistent connection to another database
***************
*** 199,204 ****
--- 207,214 ----
PGconn *conn = NULL;
remoteConn *rconn = NULL;
+ DBLINK_INIT;
+
if (PG_NARGS() == 2)
{
connstr = GET_STR(PG_GETARG_TEXT_P(1));
***************
*** 234,240 ****
createNewConnection(connname, rconn);
}
else
! persistent_conn = conn;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
--- 244,250 ----
createNewConnection(connname, rconn);
}
else
! pconn->conn = conn;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
***************
*** 250,255 ****
--- 260,267 ----
remoteConn *rconn = NULL;
PGconn *conn = NULL;
+ DBLINK_INIT;
+
if (PG_NARGS() == 1)
{
conname = GET_STR(PG_GETARG_TEXT_P(0));
***************
*** 258,264 ****
conn = rconn->conn;
}
else
! conn = persistent_conn;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
--- 270,276 ----
conn = rconn->conn;
}
else
! conn = pconn->conn;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
***************
*** 270,276 ****
pfree(rconn);
}
else
! persistent_conn = NULL;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
--- 282,288 ----
pfree(rconn);
}
else
! pconn->conn = NULL;
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
***************
*** 285,290 ****
--- 297,304 ----
char *msg;
PGresult *res = NULL;
PGconn *conn = NULL;
+ int *openCursorCount = NULL;
+ bool *newXactForCursor = NULL;
char *curname = NULL;
char *sql = NULL;
char *conname = NULL;
***************
*** 292,303 ****
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
if (PG_NARGS() == 2)
{
/* text,text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
! conn = persistent_conn;
}
else if (PG_NARGS() == 3)
{
--- 306,321 ----
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 2)
{
/* text,text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
! conn = pconn->conn;
! openCursorCount = &pconn->openCursorCount;
! newXactForCursor = &pconn->newXactForCursor;
}
else if (PG_NARGS() == 3)
{
***************
*** 307,313 ****
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
! conn = persistent_conn;
}
else
{
--- 325,333 ----
curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
! conn = pconn->conn;
! openCursorCount = &pconn->openCursorCount;
! newXactForCursor = &pconn->newXactForCursor;
}
else
{
***************
*** 316,322 ****
--- 336,346 ----
sql = GET_STR(PG_GETARG_TEXT_P(2));
rconn = getConnectionByName(conname);
if (rconn)
+ {
conn = rconn->conn;
+ openCursorCount = &rconn->openCursorCount;
+ newXactForCursor = &rconn->newXactForCursor;
+ }
}
}
else if (PG_NARGS() == 4)
***************
*** 328,344 ****
fail = PG_GETARG_BOOL(3);
rconn = getConnectionByName(conname);
if (rconn)
conn = rconn->conn;
}
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! res = PQexec(conn, "BEGIN");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! DBLINK_RES_INTERNALERROR("begin error");
! PQclear(res);
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
res = PQexec(conn, str->data);
--- 352,380 ----
fail = PG_GETARG_BOOL(3);
rconn = getConnectionByName(conname);
if (rconn)
+ {
conn = rconn->conn;
+ openCursorCount = &rconn->openCursorCount;
+ newXactForCursor = &rconn->newXactForCursor;
+ }
}
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! /* If we are not in a transaction, start one */
! if (PQtransactionStatus(conn) == PQTRANS_IDLE)
! {
! res = PQexec(conn, "BEGIN");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! DBLINK_RES_INTERNALERROR("begin error");
! PQclear(res);
! *newXactForCursor = TRUE;
! }
! /* if we started a transaction, increment cursor count */
! if (*newXactForCursor)
! (*openCursorCount)++;
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
res = PQexec(conn, str->data);
***************
*** 365,370 ****
--- 401,408 ----
dblink_close(PG_FUNCTION_ARGS)
{
PGconn *conn = NULL;
+ int *openCursorCount = NULL;
+ bool *newXactForCursor = NULL;
PGresult *res = NULL;
char *curname = NULL;
char *conname = NULL;
***************
*** 373,383 ****
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
if (PG_NARGS() == 1)
{
/* text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
! conn = persistent_conn;
}
else if (PG_NARGS() == 2)
{
--- 411,425 ----
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 1)
{
/* text */
curname = GET_STR(PG_GETARG_TEXT_P(0));
! conn = pconn->conn;
! openCursorCount = &pconn->openCursorCount;
! newXactForCursor = &pconn->newXactForCursor;
}
else if (PG_NARGS() == 2)
{
***************
*** 386,392 ****
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
! conn = persistent_conn;
}
else
{
--- 428,436 ----
{
curname = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
! conn = pconn->conn;
! openCursorCount = &pconn->openCursorCount;
! newXactForCursor = &pconn->newXactForCursor;
}
else
{
***************
*** 394,400 ****
--- 438,448 ----
curname = GET_STR(PG_GETARG_TEXT_P(1));
rconn = getConnectionByName(conname);
if (rconn)
+ {
conn = rconn->conn;
+ openCursorCount = &rconn->openCursorCount;
+ newXactForCursor = &rconn->newXactForCursor;
+ }
}
}
if (PG_NARGS() == 3)
***************
*** 405,411 ****
--- 453,463 ----
fail = PG_GETARG_BOOL(2);
rconn = getConnectionByName(conname);
if (rconn)
+ {
conn = rconn->conn;
+ openCursorCount = &rconn->openCursorCount;
+ newXactForCursor = &rconn->newXactForCursor;
+ }
}
if (!conn)
***************
*** 428,439 ****
PQclear(res);
! /* commit the transaction */
! res = PQexec(conn, "COMMIT");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! DBLINK_RES_INTERNALERROR("commit error");
! PQclear(res);
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
--- 480,501 ----
PQclear(res);
! /* if we started a transaction, decrement cursor count */
! if (*newXactForCursor)
! {
! (*openCursorCount)--;
! /* if count is zero, commit the transaction */
! if (*openCursorCount == 0)
! {
! *newXactForCursor = FALSE;
!
! res = PQexec(conn, "COMMIT");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! DBLINK_RES_INTERNALERROR("commit error");
! PQclear(res);
! }
! }
PG_RETURN_TEXT_P(GET_TEXT("OK"));
}
***************
*** 456,461 ****
--- 518,525 ----
char *conname = NULL;
remoteConn *rconn = NULL;
+ DBLINK_INIT;
+
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
***************
*** 485,491 ****
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
fail = PG_GETARG_BOOL(2);
! conn = persistent_conn;
}
else
{
--- 549,555 ----
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
fail = PG_GETARG_BOOL(2);
! conn = pconn->conn;
}
else
{
***************
*** 503,509 ****
/* text,int */
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
! conn = persistent_conn;
}
if (!conn)
--- 567,573 ----
/* text,int */
curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1);
! conn = pconn->conn;
}
if (!conn)
***************
*** 648,653 ****
--- 712,719 ----
MemoryContext oldcontext;
bool freeconn = false;
+ DBLINK_INIT;
+
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
***************
*** 678,684 ****
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
! conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
--- 744,750 ----
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
! conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
***************
*** 691,697 ****
else if (PG_NARGS() == 1)
{
/* text */
! conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
--- 757,763 ----
else if (PG_NARGS() == 1)
{
/* text */
! conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
***************
*** 857,862 ****
--- 923,930 ----
bool freeconn = false;
bool fail = true; /* default to backward compatible behavior */
+ DBLINK_INIT;
+
if (PG_NARGS() == 3)
{
/* must be text,text,bool */
***************
*** 869,875 ****
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
! conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
--- 937,943 ----
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
! conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
***************
*** 882,888 ****
else if (PG_NARGS() == 1)
{
/* must be single text argument */
! conn = persistent_conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
--- 950,956 ----
else if (PG_NARGS() == 1)
{
/* must be single text argument */
! conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.15
diff -c -r1.15 dblink.out
*** expected/dblink.out 8 Oct 2005 16:10:38 -0000 1.15
--- expected/dblink.out 16 Oct 2005 02:05:28 -0000
***************
*** 436,441 ****
--- 436,523 ----
ROLLBACK
(1 row)
+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+ dblink_exec
+ -------------
+ BEGIN
+ (1 row)
+
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open
+ -------------
+ OK
+ (1 row)
+
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close
+ --------------
+ OK
+ (1 row)
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ dblink_exec
+ ----------------
+ DECLARE CURSOR
+ (1 row)
+
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+ dblink_exec
+ -------------
+ COMMIT
+ (1 row)
+
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ dblink_open
+ -------------
+ OK
+ (1 row)
+
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ dblink_open
+ -------------
+ OK
+ (1 row)
+
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+ dblink_close
+ --------------
+ OK
+ (1 row)
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ dblink_exec
+ ----------------
+ DECLARE CURSOR
+ (1 row)
+
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+ dblink_close
+ --------------
+ OK
+ (1 row)
+
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ ERROR: sql error
+ DETAIL: ERROR: cursor "xact_test" already exists
+
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+ dblink_exec
+ -------------
+ ROLLBACK
+ (1 row)
+
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.14
diff -c -r1.14 dblink.sql
*** sql/dblink.sql 8 Oct 2005 16:10:38 -0000 1.14
--- sql/dblink.sql 16 Oct 2005 01:59:11 -0000
***************
*** 217,222 ****
--- 217,258 ----
-- reset remote transaction state
SELECT dblink_exec('myconn','ABORT');
+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+
-- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
---------------------------(end of broadcast)---------------------------
TIP 5: don't forget to increase your free space map settings