Tom Lane wrote:
I think it would be shorter and clearer to write

        remoteConn  *remconn = NULL;
        ...
        remconn = rconn;
        ...
        remconn->newXactForCursor = TRUE;

Also, you might be able to combine this variable with the existing
rconn local variable and thus simplify the code even more.

Thanks for the review Tom -- as usual, great suggestions. The attached (simpler) patch makes use of your advice. If there are no objections, I'll apply this tomorrow evening.

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.47
diff -c -r1.47 dblink.c
*** dblink.c	15 Oct 2005 02:49:04 -0000	1.47
--- dblink.c	17 Oct 2005 02:11:59 -0000
***************
*** 60,68 ****
  
  typedef struct remoteConn
  {
! 	PGconn	   *conn;			/* Hold the remote connection */
! 	int			autoXactCursors;/* Indicates the number of open cursors,
! 								 * non-zero means we opened the xact ourselves */
  }	remoteConn;
  
  /*
--- 60,68 ----
  
  typedef struct remoteConn
  {
! 	PGconn	   *conn;				/* Hold the remote connection */
! 	int			openCursorCount;	/* The number of open cursors */
! 	bool		newXactForCursor;	/* Opened a transaction for a cursor */
  }	remoteConn;
  
  /*
***************
*** 84,93 ****
  static char *generate_relation_name(Oid relid);
  
  /* Global */
! List	   *res_id = NIL;
! int			res_id_index = 0;
! PGconn	   *persistent_conn = NULL;
! static HTAB *remoteConnHash = NULL;
  
  /*
   *	Following is list that holds multiple remote connections.
--- 84,91 ----
  static char *generate_relation_name(Oid relid);
  
  /* Global */
! static remoteConn	   *pconn = NULL;
! static HTAB			   *remoteConnHash = NULL;
  
  /*
   *	Following is list that holds multiple remote connections.
***************
*** 184,189 ****
--- 182,197 ----
  			} \
  	} while (0)
  
+ #define DBLINK_INIT \
+ 	do { \
+ 			if (!pconn) \
+ 			{ \
+ 				pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+ 				pconn->conn = NULL; \
+ 				pconn->openCursorCount = 0; \
+ 				pconn->newXactForCursor = FALSE; \
+ 			} \
+ 	} while (0)
  
  /*
   * Create a persistent connection to another database
***************
*** 199,204 ****
--- 207,214 ----
  	PGconn	   *conn = NULL;
  	remoteConn *rconn = NULL;
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 2)
  	{
  		connstr = GET_STR(PG_GETARG_TEXT_P(1));
***************
*** 234,240 ****
  		createNewConnection(connname, rconn);
  	}
  	else
! 		persistent_conn = conn;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 244,250 ----
  		createNewConnection(connname, rconn);
  	}
  	else
! 		pconn->conn = conn;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 250,255 ****
--- 260,267 ----
  	remoteConn *rconn = NULL;
  	PGconn	   *conn = NULL;
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 1)
  	{
  		conname = GET_STR(PG_GETARG_TEXT_P(0));
***************
*** 258,264 ****
  			conn = rconn->conn;
  	}
  	else
! 		conn = persistent_conn;
  
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
--- 270,276 ----
  			conn = rconn->conn;
  	}
  	else
! 		conn = pconn->conn;
  
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
***************
*** 270,276 ****
  		pfree(rconn);
  	}
  	else
! 		persistent_conn = NULL;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 282,288 ----
  		pfree(rconn);
  	}
  	else
! 		pconn->conn = NULL;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 292,303 ****
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
  	if (PG_NARGS() == 2)
  	{
  		/* text,text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
  		sql = GET_STR(PG_GETARG_TEXT_P(1));
! 		conn = persistent_conn;
  	}
  	else if (PG_NARGS() == 3)
  	{
--- 304,317 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 2)
  	{
  		/* text,text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
  		sql = GET_STR(PG_GETARG_TEXT_P(1));
! 		rconn = pconn;
  	}
  	else if (PG_NARGS() == 3)
  	{
***************
*** 307,313 ****
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
! 			conn = persistent_conn;
  		}
  		else
  		{
--- 321,327 ----
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
! 			rconn = pconn;
  		}
  		else
  		{
***************
*** 315,322 ****
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			sql = GET_STR(PG_GETARG_TEXT_P(2));
  			rconn = getConnectionByName(conname);
- 			if (rconn)
- 				conn = rconn->conn;
  		}
  	}
  	else if (PG_NARGS() == 4)
--- 329,334 ----
***************
*** 327,344 ****
  		sql = GET_STR(PG_GETARG_TEXT_P(2));
  		fail = PG_GETARG_BOOL(3);
  		rconn = getConnectionByName(conname);
- 		if (rconn)
- 			conn = rconn->conn;
  	}
  
! 	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(conn, "BEGIN");
! 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		DBLINK_RES_INTERNALERROR("begin error");
  
! 	PQclear(res);
  
  	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
  	res = PQexec(conn, str->data);
--- 339,364 ----
  		sql = GET_STR(PG_GETARG_TEXT_P(2));
  		fail = PG_GETARG_BOOL(3);
  		rconn = getConnectionByName(conname);
  	}
  
! 	if (!rconn || !rconn->conn)
  		DBLINK_CONN_NOT_AVAIL;
+ 	else
+ 		conn = rconn->conn;
  
! 	/*	If we are not in a transaction, start one */
! 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
! 	{
! 		res = PQexec(conn, "BEGIN");
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 			DBLINK_RES_INTERNALERROR("begin error");
! 		PQclear(res);
! 		rconn->newXactForCursor = TRUE;
! 	}
  
! 	/* if we started a transaction, increment cursor count */
! 	if (rconn->newXactForCursor)
! 		(rconn->openCursorCount)++;
  
  	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
  	res = PQexec(conn, str->data);
***************
*** 373,383 ****
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
  	if (PG_NARGS() == 1)
  	{
  		/* text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		conn = persistent_conn;
  	}
  	else if (PG_NARGS() == 2)
  	{
--- 393,405 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 1)
  	{
  		/* text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		rconn = pconn;
  	}
  	else if (PG_NARGS() == 2)
  	{
***************
*** 386,400 ****
  		{
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
! 			conn = persistent_conn;
  		}
  		else
  		{
  			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			rconn = getConnectionByName(conname);
- 			if (rconn)
- 				conn = rconn->conn;
  		}
  	}
  	if (PG_NARGS() == 3)
--- 408,420 ----
  		{
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
! 			rconn = pconn;
  		}
  		else
  		{
  			conname = GET_STR(PG_GETARG_TEXT_P(0));
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			rconn = getConnectionByName(conname);
  		}
  	}
  	if (PG_NARGS() == 3)
***************
*** 404,415 ****
  		curname = GET_STR(PG_GETARG_TEXT_P(1));
  		fail = PG_GETARG_BOOL(2);
  		rconn = getConnectionByName(conname);
- 		if (rconn)
- 			conn = rconn->conn;
  	}
  
! 	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
  	appendStringInfo(str, "CLOSE %s", curname);
  
--- 424,435 ----
  		curname = GET_STR(PG_GETARG_TEXT_P(1));
  		fail = PG_GETARG_BOOL(2);
  		rconn = getConnectionByName(conname);
  	}
  
! 	if (!rconn || !rconn->conn)
  		DBLINK_CONN_NOT_AVAIL;
+ 	else
+ 		conn = rconn->conn;
  
  	appendStringInfo(str, "CLOSE %s", curname);
  
***************
*** 428,439 ****
  
  	PQclear(res);
  
! 	/* commit the transaction */
! 	res = PQexec(conn, "COMMIT");
! 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		DBLINK_RES_INTERNALERROR("commit error");
  
! 	PQclear(res);
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 448,469 ----
  
  	PQclear(res);
  
! 	/* if we started a transaction, decrement cursor count */
! 	if (rconn->newXactForCursor)
! 	{
! 		(rconn->openCursorCount)--;
  
! 		/* if count is zero, commit the transaction */
! 		if (rconn->openCursorCount == 0)
! 		{
! 			rconn->newXactForCursor = FALSE;
! 
! 			res = PQexec(conn, "COMMIT");
! 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 				DBLINK_RES_INTERNALERROR("commit error");
! 			PQclear(res);
! 		}
! 	}
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 456,461 ****
--- 486,493 ----
  	char	   *conname = NULL;
  	remoteConn *rconn = NULL;
  
+ 	DBLINK_INIT;
+ 
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
***************
*** 485,491 ****
  				curname = GET_STR(PG_GETARG_TEXT_P(0));
  				howmany = PG_GETARG_INT32(1);
  				fail = PG_GETARG_BOOL(2);
! 				conn = persistent_conn;
  			}
  			else
  			{
--- 517,523 ----
  				curname = GET_STR(PG_GETARG_TEXT_P(0));
  				howmany = PG_GETARG_INT32(1);
  				fail = PG_GETARG_BOOL(2);
! 				conn = pconn->conn;
  			}
  			else
  			{
***************
*** 503,509 ****
  			/* text,int */
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			howmany = PG_GETARG_INT32(1);
! 			conn = persistent_conn;
  		}
  
  		if (!conn)
--- 535,541 ----
  			/* text,int */
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			howmany = PG_GETARG_INT32(1);
! 			conn = pconn->conn;
  		}
  
  		if (!conn)
***************
*** 648,653 ****
--- 680,687 ----
  	MemoryContext oldcontext;
  	bool		freeconn = false;
  
+ 	DBLINK_INIT;
+ 
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
***************
*** 678,684 ****
  			/* text,text or text,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conn = persistent_conn;
  				sql = GET_STR(PG_GETARG_TEXT_P(0));
  				fail = PG_GETARG_BOOL(1);
  			}
--- 712,718 ----
  			/* text,text or text,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conn = pconn->conn;
  				sql = GET_STR(PG_GETARG_TEXT_P(0));
  				fail = PG_GETARG_BOOL(1);
  			}
***************
*** 691,697 ****
  		else if (PG_NARGS() == 1)
  		{
  			/* text */
! 			conn = persistent_conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  		}
  		else
--- 725,731 ----
  		else if (PG_NARGS() == 1)
  		{
  			/* text */
! 			conn = pconn->conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  		}
  		else
***************
*** 857,862 ****
--- 891,898 ----
  	bool		freeconn = false;
  	bool		fail = true;	/* default to backward compatible behavior */
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 3)
  	{
  		/* must be text,text,bool */
***************
*** 869,875 ****
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
! 			conn = persistent_conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
--- 905,911 ----
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
! 			conn = pconn->conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
***************
*** 882,888 ****
  	else if (PG_NARGS() == 1)
  	{
  		/* must be single text argument */
! 		conn = persistent_conn;
  		sql = GET_STR(PG_GETARG_TEXT_P(0));
  	}
  	else
--- 918,924 ----
  	else if (PG_NARGS() == 1)
  	{
  		/* must be single text argument */
! 		conn = pconn->conn;
  		sql = GET_STR(PG_GETARG_TEXT_P(0));
  	}
  	else
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.15
diff -c -r1.15 dblink.out
*** expected/dblink.out	8 Oct 2005 16:10:38 -0000	1.15
--- expected/dblink.out	17 Oct 2005 02:04:09 -0000
***************
*** 436,441 ****
--- 436,523 ----
   ROLLBACK
  (1 row)
  
+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+  dblink_exec 
+ -------------
+  BEGIN
+ (1 row)
+ 
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open 
+ -------------
+  OK
+ (1 row)
+ 
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close 
+ --------------
+  OK
+ (1 row)
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec   
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+ 
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+  dblink_exec 
+ -------------
+  COMMIT
+ (1 row)
+ 
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open 
+ -------------
+  OK
+ (1 row)
+ 
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+  dblink_open 
+ -------------
+  OK
+ (1 row)
+ 
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+  dblink_close 
+ --------------
+  OK
+ (1 row)
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec   
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+ 
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close 
+ --------------
+  OK
+ (1 row)
+ 
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ ERROR:  sql error
+ DETAIL:  ERROR:  cursor "xact_test" already exists
+ 
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+  dblink_exec 
+ -------------
+  ROLLBACK
+ (1 row)
+ 
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
   dblink_open 
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.14
diff -c -r1.14 dblink.sql
*** sql/dblink.sql	8 Oct 2005 16:10:38 -0000	1.14
--- sql/dblink.sql	17 Oct 2005 02:04:09 -0000
***************
*** 217,222 ****
--- 217,258 ----
  -- reset remote transaction state
  SELECT dblink_exec('myconn','ABORT');
  
+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+ 
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ 
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ 
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+ 
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ 
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ 
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ 
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+ 
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ 
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+ 
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
  
---------------------------(end of broadcast)---------------------------
TIP 1: if posting/reading through Usenet, please send an appropriate
       subscribe-nomail command to [EMAIL PROTECTED] so that your
       message can get through to the mailing list cleanly

Reply via email to