Bruce Momjian wrote:
No problem -- thanks.  I have slimmed down the patch by applying the
cosmetic parts to CVS.  Use the URL above to get the newest versions of
the dblink.c and regression changes.


Here is my counter-proposal to Bruce's dblink patch. Any comments?

Is it too late to apply this for 8.1? I tend to agree with calling this a bugfix.

Thanks,

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.47
diff -c -r1.47 dblink.c
*** dblink.c	15 Oct 2005 02:49:04 -0000	1.47
--- dblink.c	16 Oct 2005 02:04:13 -0000
***************
*** 60,68 ****
  
  typedef struct remoteConn
  {
! 	PGconn	   *conn;			/* Hold the remote connection */
! 	int			autoXactCursors;/* Indicates the number of open cursors,
! 								 * non-zero means we opened the xact ourselves */
  }	remoteConn;
  
  /*
--- 60,68 ----
  
  typedef struct remoteConn
  {
! 	PGconn	   *conn;				/* Hold the remote connection */
! 	int			openCursorCount;	/* The number of open cursors */
! 	bool		newXactForCursor;	/* Opened a transaction for a cursor */
  }	remoteConn;
  
  /*
***************
*** 84,93 ****
  static char *generate_relation_name(Oid relid);
  
  /* Global */
! List	   *res_id = NIL;
! int			res_id_index = 0;
! PGconn	   *persistent_conn = NULL;
! static HTAB *remoteConnHash = NULL;
  
  /*
   *	Following is list that holds multiple remote connections.
--- 84,91 ----
  static char *generate_relation_name(Oid relid);
  
  /* Global */
! static remoteConn	   *pconn = NULL;
! static HTAB			   *remoteConnHash = NULL;
  
  /*
   *	Following is list that holds multiple remote connections.
***************
*** 184,189 ****
--- 182,197 ----
  			} \
  	} while (0)
  
+ #define DBLINK_INIT \
+ 	do { \
+ 			if (!pconn) \
+ 			{ \
+ 				pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+ 				pconn->conn = NULL; \
+ 				pconn->openCursorCount = 0; \
+ 				pconn->newXactForCursor = FALSE; \
+ 			} \
+ 	} while (0)
  
  /*
   * Create a persistent connection to another database
***************
*** 199,204 ****
--- 207,214 ----
  	PGconn	   *conn = NULL;
  	remoteConn *rconn = NULL;
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 2)
  	{
  		connstr = GET_STR(PG_GETARG_TEXT_P(1));
***************
*** 234,240 ****
  		createNewConnection(connname, rconn);
  	}
  	else
! 		persistent_conn = conn;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 244,250 ----
  		createNewConnection(connname, rconn);
  	}
  	else
! 		pconn->conn = conn;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 250,255 ****
--- 260,267 ----
  	remoteConn *rconn = NULL;
  	PGconn	   *conn = NULL;
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 1)
  	{
  		conname = GET_STR(PG_GETARG_TEXT_P(0));
***************
*** 258,264 ****
  			conn = rconn->conn;
  	}
  	else
! 		conn = persistent_conn;
  
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
--- 270,276 ----
  			conn = rconn->conn;
  	}
  	else
! 		conn = pconn->conn;
  
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
***************
*** 270,276 ****
  		pfree(rconn);
  	}
  	else
! 		persistent_conn = NULL;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 282,288 ----
  		pfree(rconn);
  	}
  	else
! 		pconn->conn = NULL;
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 285,290 ****
--- 297,304 ----
  	char	   *msg;
  	PGresult   *res = NULL;
  	PGconn	   *conn = NULL;
+ 	int		   *openCursorCount = NULL;
+ 	bool	   *newXactForCursor = NULL;
  	char	   *curname = NULL;
  	char	   *sql = NULL;
  	char	   *conname = NULL;
***************
*** 292,303 ****
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
  	if (PG_NARGS() == 2)
  	{
  		/* text,text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
  		sql = GET_STR(PG_GETARG_TEXT_P(1));
! 		conn = persistent_conn;
  	}
  	else if (PG_NARGS() == 3)
  	{
--- 306,321 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 2)
  	{
  		/* text,text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
  		sql = GET_STR(PG_GETARG_TEXT_P(1));
! 		conn = pconn->conn;
! 		openCursorCount = &pconn->openCursorCount;
! 		newXactForCursor = &pconn->newXactForCursor;
  	}
  	else if (PG_NARGS() == 3)
  	{
***************
*** 307,313 ****
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
! 			conn = persistent_conn;
  		}
  		else
  		{
--- 325,333 ----
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			sql = GET_STR(PG_GETARG_TEXT_P(1));
  			fail = PG_GETARG_BOOL(2);
! 			conn = pconn->conn;
! 			openCursorCount = &pconn->openCursorCount;
! 			newXactForCursor = &pconn->newXactForCursor;
  		}
  		else
  		{
***************
*** 316,322 ****
--- 336,346 ----
  			sql = GET_STR(PG_GETARG_TEXT_P(2));
  			rconn = getConnectionByName(conname);
  			if (rconn)
+ 			{
  				conn = rconn->conn;
+ 				openCursorCount = &rconn->openCursorCount;
+ 				newXactForCursor = &rconn->newXactForCursor;
+ 			}
  		}
  	}
  	else if (PG_NARGS() == 4)
***************
*** 328,344 ****
  		fail = PG_GETARG_BOOL(3);
  		rconn = getConnectionByName(conname);
  		if (rconn)
  			conn = rconn->conn;
  	}
  
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(conn, "BEGIN");
! 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		DBLINK_RES_INTERNALERROR("begin error");
  
! 	PQclear(res);
  
  	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
  	res = PQexec(conn, str->data);
--- 352,380 ----
  		fail = PG_GETARG_BOOL(3);
  		rconn = getConnectionByName(conname);
  		if (rconn)
+ 		{
  			conn = rconn->conn;
+ 			openCursorCount = &rconn->openCursorCount;
+ 			newXactForCursor = &rconn->newXactForCursor;
+ 		}
  	}
  
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	/*	If we are not in a transaction, start one */
! 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
! 	{
! 		res = PQexec(conn, "BEGIN");
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 			DBLINK_RES_INTERNALERROR("begin error");
! 		PQclear(res);
! 		*newXactForCursor = TRUE;
! 	}
  
! 	/* if we started a transaction, increment cursor count */
! 	if (*newXactForCursor)
! 		(*openCursorCount)++;
  
  	appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
  	res = PQexec(conn, str->data);
***************
*** 365,370 ****
--- 401,408 ----
  dblink_close(PG_FUNCTION_ARGS)
  {
  	PGconn	   *conn = NULL;
+ 	int		   *openCursorCount = NULL;
+ 	bool	   *newXactForCursor = NULL;
  	PGresult   *res = NULL;
  	char	   *curname = NULL;
  	char	   *conname = NULL;
***************
*** 373,383 ****
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
  	if (PG_NARGS() == 1)
  	{
  		/* text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		conn = persistent_conn;
  	}
  	else if (PG_NARGS() == 2)
  	{
--- 411,425 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 1)
  	{
  		/* text */
  		curname = GET_STR(PG_GETARG_TEXT_P(0));
! 		conn = pconn->conn;
! 		openCursorCount = &pconn->openCursorCount;
! 		newXactForCursor = &pconn->newXactForCursor;
  	}
  	else if (PG_NARGS() == 2)
  	{
***************
*** 386,392 ****
  		{
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
! 			conn = persistent_conn;
  		}
  		else
  		{
--- 428,436 ----
  		{
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
! 			conn = pconn->conn;
! 			openCursorCount = &pconn->openCursorCount;
! 			newXactForCursor = &pconn->newXactForCursor;
  		}
  		else
  		{
***************
*** 394,400 ****
--- 438,448 ----
  			curname = GET_STR(PG_GETARG_TEXT_P(1));
  			rconn = getConnectionByName(conname);
  			if (rconn)
+ 			{
  				conn = rconn->conn;
+ 				openCursorCount = &rconn->openCursorCount;
+ 				newXactForCursor = &rconn->newXactForCursor;
+ 			}
  		}
  	}
  	if (PG_NARGS() == 3)
***************
*** 405,411 ****
--- 453,463 ----
  		fail = PG_GETARG_BOOL(2);
  		rconn = getConnectionByName(conname);
  		if (rconn)
+ 		{
  			conn = rconn->conn;
+ 			openCursorCount = &rconn->openCursorCount;
+ 			newXactForCursor = &rconn->newXactForCursor;
+ 		}
  	}
  
  	if (!conn)
***************
*** 428,439 ****
  
  	PQclear(res);
  
! 	/* commit the transaction */
! 	res = PQexec(conn, "COMMIT");
! 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		DBLINK_RES_INTERNALERROR("commit error");
  
! 	PQclear(res);
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 480,501 ----
  
  	PQclear(res);
  
! 	/* if we started a transaction, decrement cursor count */
! 	if (*newXactForCursor)
! 	{
! 		(*openCursorCount)--;
  
! 		/* if count is zero, commit the transaction */
! 		if (*openCursorCount == 0)
! 		{
! 			*newXactForCursor = FALSE;
! 
! 			res = PQexec(conn, "COMMIT");
! 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 				DBLINK_RES_INTERNALERROR("commit error");
! 			PQclear(res);
! 		}
! 	}
  
  	PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 456,461 ****
--- 518,525 ----
  	char	   *conname = NULL;
  	remoteConn *rconn = NULL;
  
+ 	DBLINK_INIT;
+ 
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
***************
*** 485,491 ****
  				curname = GET_STR(PG_GETARG_TEXT_P(0));
  				howmany = PG_GETARG_INT32(1);
  				fail = PG_GETARG_BOOL(2);
! 				conn = persistent_conn;
  			}
  			else
  			{
--- 549,555 ----
  				curname = GET_STR(PG_GETARG_TEXT_P(0));
  				howmany = PG_GETARG_INT32(1);
  				fail = PG_GETARG_BOOL(2);
! 				conn = pconn->conn;
  			}
  			else
  			{
***************
*** 503,509 ****
  			/* text,int */
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			howmany = PG_GETARG_INT32(1);
! 			conn = persistent_conn;
  		}
  
  		if (!conn)
--- 567,573 ----
  			/* text,int */
  			curname = GET_STR(PG_GETARG_TEXT_P(0));
  			howmany = PG_GETARG_INT32(1);
! 			conn = pconn->conn;
  		}
  
  		if (!conn)
***************
*** 648,653 ****
--- 712,719 ----
  	MemoryContext oldcontext;
  	bool		freeconn = false;
  
+ 	DBLINK_INIT;
+ 
  	/* stuff done only on the first call of the function */
  	if (SRF_IS_FIRSTCALL())
  	{
***************
*** 678,684 ****
  			/* text,text or text,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conn = persistent_conn;
  				sql = GET_STR(PG_GETARG_TEXT_P(0));
  				fail = PG_GETARG_BOOL(1);
  			}
--- 744,750 ----
  			/* text,text or text,bool */
  			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  			{
! 				conn = pconn->conn;
  				sql = GET_STR(PG_GETARG_TEXT_P(0));
  				fail = PG_GETARG_BOOL(1);
  			}
***************
*** 691,697 ****
  		else if (PG_NARGS() == 1)
  		{
  			/* text */
! 			conn = persistent_conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  		}
  		else
--- 757,763 ----
  		else if (PG_NARGS() == 1)
  		{
  			/* text */
! 			conn = pconn->conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  		}
  		else
***************
*** 857,862 ****
--- 923,930 ----
  	bool		freeconn = false;
  	bool		fail = true;	/* default to backward compatible behavior */
  
+ 	DBLINK_INIT;
+ 
  	if (PG_NARGS() == 3)
  	{
  		/* must be text,text,bool */
***************
*** 869,875 ****
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
! 			conn = persistent_conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
--- 937,943 ----
  		/* might be text,text or text,bool */
  		if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
  		{
! 			conn = pconn->conn;
  			sql = GET_STR(PG_GETARG_TEXT_P(0));
  			fail = PG_GETARG_BOOL(1);
  		}
***************
*** 882,888 ****
  	else if (PG_NARGS() == 1)
  	{
  		/* must be single text argument */
! 		conn = persistent_conn;
  		sql = GET_STR(PG_GETARG_TEXT_P(0));
  	}
  	else
--- 950,956 ----
  	else if (PG_NARGS() == 1)
  	{
  		/* must be single text argument */
! 		conn = pconn->conn;
  		sql = GET_STR(PG_GETARG_TEXT_P(0));
  	}
  	else
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.15
diff -c -r1.15 dblink.out
*** expected/dblink.out	8 Oct 2005 16:10:38 -0000	1.15
--- expected/dblink.out	16 Oct 2005 02:05:28 -0000
***************
*** 436,441 ****
--- 436,523 ----
   ROLLBACK
  (1 row)
  
+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+  dblink_exec 
+ -------------
+  BEGIN
+ (1 row)
+ 
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open 
+ -------------
+  OK
+ (1 row)
+ 
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close 
+ --------------
+  OK
+ (1 row)
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec   
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+ 
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+  dblink_exec 
+ -------------
+  COMMIT
+ (1 row)
+ 
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open 
+ -------------
+  OK
+ (1 row)
+ 
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+  dblink_open 
+ -------------
+  OK
+ (1 row)
+ 
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+  dblink_close 
+ --------------
+  OK
+ (1 row)
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec   
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+ 
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close 
+ --------------
+  OK
+ (1 row)
+ 
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ ERROR:  sql error
+ DETAIL:  ERROR:  cursor "xact_test" already exists
+ 
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+  dblink_exec 
+ -------------
+  ROLLBACK
+ (1 row)
+ 
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
   dblink_open 
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.14
diff -c -r1.14 dblink.sql
*** sql/dblink.sql	8 Oct 2005 16:10:38 -0000	1.14
--- sql/dblink.sql	16 Oct 2005 01:59:11 -0000
***************
*** 217,222 ****
--- 217,258 ----
  -- reset remote transaction state
  SELECT dblink_exec('myconn','ABORT');
  
+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+ 
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ 
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ 
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+ 
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+ 
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+ 
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+ 
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ 
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+ 
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ 
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+ 
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
  
---------------------------(end of broadcast)---------------------------
TIP 5: don't forget to increase your free space map settings

Reply via email to