> I'm not a member of this list (yet), so please CC me on responses and
> discussion. The patch below seems to be completion of work already
> started, because the boolean remoteTrFlag was already defined, and all I
> had to add was its setting and two references. I hope someone will find
> it useful,
> 
> Jonathan

I have worked on this issue and have an extensive patch to dblink to fix
it.

The reported problem is that dblink_open/dblink_close() (for cursor
reads) do a BEGIN/COMMIT regardless of the transaction state of the
remote connection.  There was code in dblink.c to track the remote
transaction state (rconn), but it was not being maintained or used.

This patch fixes that by routing all connections through an rconn
structure and using the transaction status properly.  I removed the
global persistent connection and function-local 'conn' structures in
favor of using rconn consistently.  This cleans up a lot of error-prone
code that tried to track what type of connection was being used.

I don't know if people want this for 8.1 or 8.2.

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /cvsroot/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.43
diff -c -c -r1.43 dblink.c
*** contrib/dblink/dblink.c     30 May 2005 23:09:06 -0000      1.43
--- contrib/dblink/dblink.c     6 Oct 2005 04:18:34 -0000
***************
*** 60,67 ****
  
  typedef struct remoteConn
  {
!       PGconn     *con;                        /* Hold the remote connection */
!       bool            remoteTrFlag;   /* Indicates whether or not a 
transaction
                                                                 * on remote 
database is in progress */
  }     remoteConn;
  
--- 60,67 ----
  
  typedef struct remoteConn
  {
!       PGconn     *conn;                       /* Hold the remote connection */
!       bool            remoteXactOpen; /* Indicates whether or not a 
transaction
                                                                 * on remote 
database is in progress */
  }     remoteConn;
  
***************
*** 86,110 ****
  /* Global */
  List     *res_id = NIL;
  int                   res_id_index = 0;
- PGconn           *persistent_conn = NULL;
  static HTAB *remoteConnHash = NULL;
  
  /*
! Following is list that holds multiple remote connections.
! Calling convention of each dblink function changes to accept
! connection name as the first parameter. The connection list is
! much like ecpg e.g. a mapping between a name and a PGconn object.
  */
  
  typedef struct remoteConnHashEnt
  {
        char            name[NAMEDATALEN];
!       remoteConn *rcon;
  }     remoteConnHashEnt;
  
  /* initial number of connection hashes */
  #define NUMCONN 16
  
  /* general utility */
  #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, 
CStringGetDatum(cstrp)))
  #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, 
PointerGetDatum(textp)))
--- 86,115 ----
  /* Global */
  List     *res_id = NIL;
  int                   res_id_index = 0;
  static HTAB *remoteConnHash = NULL;
  
  /*
!  *    Following is list that holds multiple remote connections.
!  *    Calling convention of each dblink function changes to accept
!  *    connection name as the first parameter. The connection list is
!  *    much like ecpg e.g. a mapping between a name and a PGconn object.
  */
  
  typedef struct remoteConnHashEnt
  {
        char            name[NAMEDATALEN];
!       remoteConn *rconn;      /* EMPTY_CONNECTION_NAME also possible */
  }     remoteConnHashEnt;
  
  /* initial number of connection hashes */
  #define NUMCONN 16
  
+ /*
+  *    Because the argument protocol is V1, no connection name behaves
+  *    the same as a NULL-passed connection name
+  */
+ #define       EMPTY_CONNECTION_NAME   ""
+ 
  /* general utility */
  #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, 
CStringGetDatum(cstrp)))
  #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, 
PointerGetDatum(textp)))
***************
*** 116,131 ****
                        var_ = NULL; \
                } \
        } while (0)
  #define DBLINK_RES_INTERNALERROR(p2) \
        do { \
!                       msg = pstrdup(PQerrorMessage(conn)); \
                        if (res) \
                                PQclear(res); \
                        elog(ERROR, "%s: %s", p2, msg); \
        } while (0)
  #define DBLINK_RES_ERROR(p2) \
        do { \
!                       msg = pstrdup(PQerrorMessage(conn)); \
                        if (res) \
                                PQclear(res); \
                        ereport(ERROR, \
--- 121,138 ----
                        var_ = NULL; \
                } \
        } while (0)
+ 
  #define DBLINK_RES_INTERNALERROR(p2) \
        do { \
!                       msg = pstrdup(PQerrorMessage(rconn->conn)); \
                        if (res) \
                                PQclear(res); \
                        elog(ERROR, "%s: %s", p2, msg); \
        } while (0)
+ 
  #define DBLINK_RES_ERROR(p2) \
        do { \
!                       msg = pstrdup(PQerrorMessage(rconn->conn)); \
                        if (res) \
                                PQclear(res); \
                        ereport(ERROR, \
***************
*** 133,141 ****
                                         errmsg("%s", p2), \
                                         errdetail("%s", msg))); \
        } while (0)
  #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
        do { \
!                       msg = pstrdup(PQerrorMessage(conn)); \
                        if (res) \
                                PQclear(res); \
                        ereport(NOTICE, \
--- 140,149 ----
                                         errmsg("%s", p2), \
                                         errdetail("%s", msg))); \
        } while (0)
+ 
  #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
        do { \
!                       msg = pstrdup(PQerrorMessage(rconn->conn)); \
                        if (res) \
                                PQclear(res); \
                        ereport(NOTICE, \
***************
*** 143,151 ****
                                         errmsg("%s", p2), \
                                         errdetail("%s", msg))); \
        } while (0)
  #define DBLINK_CONN_NOT_AVAIL \
        do { \
!               if(conname) \
                        ereport(ERROR, \
                                        
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
                                         errmsg("connection \"%s\" not 
available", conname))); \
--- 151,160 ----
                                         errmsg("%s", p2), \
                                         errdetail("%s", msg))); \
        } while (0)
+ 
  #define DBLINK_CONN_NOT_AVAIL \
        do { \
!               if (conname && strlen(conname) != 0) \
                        ereport(ERROR, \
                                        
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
                                         errmsg("connection \"%s\" not 
available", conname))); \
***************
*** 154,181 ****
                                        
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
                                         errmsg("connection not available"))); \
        } while (0)
  #define DBLINK_GET_CONN \
        do { \
!                       char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \
!                       rcon = getConnectionByName(conname_or_str); \
!                       if(rcon) \
!                       { \
!                               conn = rcon->con; \
!                       } \
!                       else \
                        { \
!                               connstr = conname_or_str; \
!                               conn = PQconnectdb(connstr); \
!                               if (PQstatus(conn) == CONNECTION_BAD) \
                                { \
!                                       msg = pstrdup(PQerrorMessage(conn)); \
!                                       PQfinish(conn); \
                                        ereport(ERROR, \
                                                        
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
                                                         errmsg("could not 
establish connection"), \
                                                         errdetail("%s", 
msg))); \
                                } \
!                               freeconn = true; \
                        } \
        } while (0)
  
--- 163,194 ----
                                        
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
                                         errmsg("connection not available"))); \
        } while (0)
+ 
  #define DBLINK_GET_CONN \
        do { \
!                       rconn = getConnectionByName(conname); \
!                       if(!rconn) \
                        { \
!                               /* \
!                                *      Does not match connection name, so must 
be conn string. \
!                                *      Create an rconn structure that we will 
free before the \
!                                *      function completes.  Don't bother 
storing it in the hash. \
!                                */ \
!                               rconn = (remoteConn *) 
palloc(sizeof(remoteConn)); \
!                               rconn->conn = PQconnectdb(conname); \
!                               rconn->remoteXactOpen = false; \
!                               conname = EMPTY_CONNECTION_NAME; \
!                               if (PQstatus(rconn->conn) == CONNECTION_BAD) \
                                { \
!                                       msg = 
pstrdup(PQerrorMessage(rconn->conn)); \
!                                       PQfinish(rconn->conn); \
!                                       pfree(rconn); \
                                        ereport(ERROR, \
                                                        
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
                                                         errmsg("could not 
establish connection"), \
                                                         errdetail("%s", 
msg))); \
                                } \
!                               rconn_is_local = true; \
                        } \
        } while (0)
  
***************
*** 187,221 ****
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
!       char       *connstr = NULL;
!       char       *connname = NULL;
        char       *msg;
        MemoryContext oldcontext;
!       PGconn     *conn = NULL;
!       remoteConn *rcon = NULL;
  
        if (PG_NARGS() == 2)
        {
                connstr = GET_STR(PG_GETARG_TEXT_P(1));
-               connname = GET_STR(PG_GETARG_TEXT_P(0));
        }
!       else if (PG_NARGS() == 1)
                connstr = GET_STR(PG_GETARG_TEXT_P(0));
! 
        oldcontext = MemoryContextSwitchTo(TopMemoryContext);
  
!       if (connname)
!               rcon = (remoteConn *) palloc(sizeof(remoteConn));
!       conn = PQconnectdb(connstr);
  
        MemoryContextSwitchTo(oldcontext);
  
!       if (PQstatus(conn) == CONNECTION_BAD)
        {
!               msg = pstrdup(PQerrorMessage(conn));
!               PQfinish(conn);
!               if (rcon)
!                       pfree(rcon);
  
                ereport(ERROR,
                   
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
--- 200,236 ----
  Datum
  dblink_connect(PG_FUNCTION_ARGS)
  {
!       char       *connstr;
!       char       *conname;
        char       *msg;
        MemoryContext oldcontext;
!       remoteConn *rconn;
  
        if (PG_NARGS() == 2)
        {
+               conname = GET_STR(PG_GETARG_TEXT_P(0));
                connstr = GET_STR(PG_GETARG_TEXT_P(1));
        }
!       else
!       {
!               Assert(PG_NARGS() == 1);
!               conname = EMPTY_CONNECTION_NAME;
                connstr = GET_STR(PG_GETARG_TEXT_P(0));
!       }
!               
        oldcontext = MemoryContextSwitchTo(TopMemoryContext);
  
!       rconn = (remoteConn *) palloc(sizeof(remoteConn));
! 
!       rconn->conn = PQconnectdb(connstr);
  
        MemoryContextSwitchTo(oldcontext);
  
!       if (PQstatus(rconn->conn) == CONNECTION_BAD)
        {
!               msg = pstrdup(PQerrorMessage(rconn->conn));
!               PQfinish(rconn->conn);
!               pfree(rconn);
  
                ereport(ERROR,
                   
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
***************
*** 223,235 ****
                        errdetail("%s", msg)));
        }
  
!       if (connname)
!       {
!               rcon->con = conn;
!               createNewConnection(connname, rcon);
!       }
!       else
!               persistent_conn = conn;
  
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 238,244 ----
                        errdetail("%s", msg)));
        }
  
!       createNewConnection(conname, rconn);
  
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 241,271 ****
  Datum
  dblink_disconnect(PG_FUNCTION_ARGS)
  {
!       char       *conname = NULL;
!       remoteConn *rcon = NULL;
!       PGconn     *conn = NULL;
  
        if (PG_NARGS() == 1)
-       {
                conname = GET_STR(PG_GETARG_TEXT_P(0));
-               rcon = getConnectionByName(conname);
-               if (rcon)
-                       conn = rcon->con;
-       }
        else
!               conn = persistent_conn;
  
!       if (!conn)
                DBLINK_CONN_NOT_AVAIL;
  
!       PQfinish(conn);
!       if (rcon)
!       {
!               deleteConnection(conname);
!               pfree(rcon);
!       }
!       else
!               persistent_conn = NULL;
  
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 250,275 ----
  Datum
  dblink_disconnect(PG_FUNCTION_ARGS)
  {
!       char       *conname;
!       remoteConn *rconn;
  
        if (PG_NARGS() == 1)
                conname = GET_STR(PG_GETARG_TEXT_P(0));
        else
!       {
!               Assert(PG_NARGS() == 0);
!               conname = EMPTY_CONNECTION_NAME;
!       }
!       
!       rconn = getConnectionByName(conname);
  
!       if (!rconn || !rconn->conn)
                DBLINK_CONN_NOT_AVAIL;
  
!       PQfinish(rconn->conn);
! 
!       deleteConnection(conname);
!       pfree(rconn);
  
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 277,342 ****
  Datum
  dblink_open(PG_FUNCTION_ARGS)
  {
        char       *msg;
!       PGresult   *res = NULL;
!       PGconn     *conn = NULL;
!       char       *curname = NULL;
!       char       *sql = NULL;
!       char       *conname = NULL;
        StringInfo      str = makeStringInfo();
-       remoteConn *rcon = NULL;
        bool            fail = true;    /* default to backward compatible 
behavior */
  
!       if (PG_NARGS() == 2)
        {
!               /* text,text */
!               curname = GET_STR(PG_GETARG_TEXT_P(0));
!               sql = GET_STR(PG_GETARG_TEXT_P(1));
!               conn = persistent_conn;
        }
        else if (PG_NARGS() == 3)
        {
                /* might be text,text,text or text,text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
                {
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
                        fail = PG_GETARG_BOOL(2);
-                       conn = persistent_conn;
                }
                else
                {
                        conname = GET_STR(PG_GETARG_TEXT_P(0));
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                        sql = GET_STR(PG_GETARG_TEXT_P(2));
-                       rcon = getConnectionByName(conname);
-                       if (rcon)
-                               conn = rcon->con;
                }
        }
!       else if (PG_NARGS() == 4)
        {
!               /* text,text,text,bool */
!               conname = GET_STR(PG_GETARG_TEXT_P(0));
!               curname = GET_STR(PG_GETARG_TEXT_P(1));
!               sql = GET_STR(PG_GETARG_TEXT_P(2));
!               fail = PG_GETARG_BOOL(3);
!               rcon = getConnectionByName(conname);
!               if (rcon)
!                       conn = rcon->con;
        }
  
!       if (!conn)
                DBLINK_CONN_NOT_AVAIL;
  
!       res = PQexec(conn, "BEGIN");
!       if (PQresultStatus(res) != PGRES_COMMAND_OK)
!               DBLINK_RES_INTERNALERROR("begin error");
  
!       PQclear(res);
  
        appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
!       res = PQexec(conn, str->data);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                if (fail)
--- 281,345 ----
  Datum
  dblink_open(PG_FUNCTION_ARGS)
  {
+       char       *curname;
+       char       *conname;
+       remoteConn *rconn;
        char       *msg;
!       PGresult   *res;
!       char       *sql;
        StringInfo      str = makeStringInfo();
        bool            fail = true;    /* default to backward compatible 
behavior */
  
!       if (PG_NARGS() == 4)
        {
!               /* text,text,text,bool */
!               conname = GET_STR(PG_GETARG_TEXT_P(0));
!               curname = GET_STR(PG_GETARG_TEXT_P(1));
!               sql = GET_STR(PG_GETARG_TEXT_P(2));
!               fail = PG_GETARG_BOOL(3);
        }
        else if (PG_NARGS() == 3)
        {
                /* might be text,text,text or text,text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
                {
+                       conname = EMPTY_CONNECTION_NAME;
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
                        fail = PG_GETARG_BOOL(2);
                }
                else
                {
                        conname = GET_STR(PG_GETARG_TEXT_P(0));
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                        sql = GET_STR(PG_GETARG_TEXT_P(2));
                }
        }
!       else
        {
!               /* text,text */
!               Assert(PG_NARGS() == 2);
!               conname = EMPTY_CONNECTION_NAME;
!               curname = GET_STR(PG_GETARG_TEXT_P(0));
!               sql = GET_STR(PG_GETARG_TEXT_P(1));
        }
  
!       rconn = getConnectionByName(conname);
!       if (!rconn || !rconn->conn)
                DBLINK_CONN_NOT_AVAIL;
  
!       rconn->remoteXactOpen = (PQtransactionStatus(rconn->conn) != 
PQTRANS_IDLE);
  
!       if (!rconn->remoteXactOpen)
!       {
!               res = PQexec(rconn->conn, "BEGIN");
!               if (PQresultStatus(res) != PGRES_COMMAND_OK)
!                       DBLINK_RES_INTERNALERROR("begin error");
!               PQclear(res);
!       }
  
        appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
!       res = PQexec(rconn->conn, str->data);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                if (fail)
***************
*** 359,415 ****
  Datum
  dblink_close(PG_FUNCTION_ARGS)
  {
!       PGconn     *conn = NULL;
!       PGresult   *res = NULL;
!       char       *curname = NULL;
!       char       *conname = NULL;
        StringInfo      str = makeStringInfo();
        char       *msg;
-       remoteConn *rcon = NULL;
        bool            fail = true;    /* default to backward compatible 
behavior */
  
!       if (PG_NARGS() == 1)
        {
!               /* text */
!               curname = GET_STR(PG_GETARG_TEXT_P(0));
!               conn = persistent_conn;
        }
        else if (PG_NARGS() == 2)
        {
                /* might be text,text or text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                {
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        fail = PG_GETARG_BOOL(1);
-                       conn = persistent_conn;
                }
                else
                {
                        conname = GET_STR(PG_GETARG_TEXT_P(0));
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
-                       rcon = getConnectionByName(conname);
-                       if (rcon)
-                               conn = rcon->con;
                }
        }
!       if (PG_NARGS() == 3)
        {
!               /* text,text,bool */
!               conname = GET_STR(PG_GETARG_TEXT_P(0));
!               curname = GET_STR(PG_GETARG_TEXT_P(1));
!               fail = PG_GETARG_BOOL(2);
!               rcon = getConnectionByName(conname);
!               if (rcon)
!                       conn = rcon->con;
        }
  
!       if (!conn)
                DBLINK_CONN_NOT_AVAIL;
  
        appendStringInfo(str, "CLOSE %s", curname);
  
        /* close the cursor */
!       res = PQexec(conn, str->data);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                if (fail)
--- 362,413 ----
  Datum
  dblink_close(PG_FUNCTION_ARGS)
  {
!       char       *curname;
!       char       *conname;
!       remoteConn *rconn;
        StringInfo      str = makeStringInfo();
+       PGresult   *res;
        char       *msg;
        bool            fail = true;    /* default to backward compatible 
behavior */
  
!       if (PG_NARGS() == 3)
        {
!               /* text,text,bool */
!               conname = GET_STR(PG_GETARG_TEXT_P(0));
!               curname = GET_STR(PG_GETARG_TEXT_P(1));
!               fail = PG_GETARG_BOOL(2);
        }
        else if (PG_NARGS() == 2)
        {
                /* might be text,text or text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                {
+                       conname = EMPTY_CONNECTION_NAME;
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        fail = PG_GETARG_BOOL(1);
                }
                else
                {
                        conname = GET_STR(PG_GETARG_TEXT_P(0));
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                }
        }
!       else
        {
!               /* text */
!               Assert(PG_NARGS() == 1);
!               conname = EMPTY_CONNECTION_NAME;
!               curname = GET_STR(PG_GETARG_TEXT_P(0));
        }
  
!       rconn = getConnectionByName(conname);
!       if (!rconn || !rconn->conn)
                DBLINK_CONN_NOT_AVAIL;
  
        appendStringInfo(str, "CLOSE %s", curname);
  
        /* close the cursor */
!       res = PQexec(rconn->conn, str->data);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                if (fail)
***************
*** 423,434 ****
  
        PQclear(res);
  
!       /* commit the transaction */
!       res = PQexec(conn, "COMMIT");
!       if (PQresultStatus(res) != PGRES_COMMAND_OK)
!               DBLINK_RES_INTERNALERROR("commit error");
! 
!       PQclear(res);
  
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 421,435 ----
  
        PQclear(res);
  
!       /* We are using the Xact status we recorded during the open */
!       if (!rconn->remoteXactOpen)
!       {
!               /* commit the transaction */
!               res = PQexec(rconn->conn, "COMMIT");
!               if (PQresultStatus(res) != PGRES_COMMAND_OK)
!                       DBLINK_RES_INTERNALERROR("commit error");
!               PQclear(res);
!       }
  
        PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 440,445 ****
--- 441,448 ----
  Datum
  dblink_fetch(PG_FUNCTION_ARGS)
  {
+       char       *conname = NULL;
+       remoteConn *rconn = NULL;
        FuncCallContext *funcctx;
        TupleDesc       tupdesc = NULL;
        int                     call_cntr;
***************
*** 448,462 ****
        char       *msg;
        PGresult   *res = NULL;
        MemoryContext oldcontext;
-       char       *conname = NULL;
-       remoteConn *rcon = NULL;
  
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
!               PGconn     *conn = NULL;
                StringInfo      str = makeStringInfo();
-               char       *curname = NULL;
                int                     howmany = 0;
                bool            fail = true;    /* default to backward 
compatible */
  
--- 451,462 ----
        char       *msg;
        PGresult   *res = NULL;
        MemoryContext oldcontext;
  
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
!               char       *curname;
                StringInfo      str = makeStringInfo();
                int                     howmany = 0;
                bool            fail = true;    /* default to backward 
compatible */
  
***************
*** 467,507 ****
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                        howmany = PG_GETARG_INT32(2);
                        fail = PG_GETARG_BOOL(3);
- 
-                       rcon = getConnectionByName(conname);
-                       if (rcon)
-                               conn = rcon->con;
                }
                else if (PG_NARGS() == 3)
                {
                        /* text,text,int or text,int,bool */
                        if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
                        {
                                curname = GET_STR(PG_GETARG_TEXT_P(0));
                                howmany = PG_GETARG_INT32(1);
                                fail = PG_GETARG_BOOL(2);
-                               conn = persistent_conn;
                        }
                        else
                        {
                                conname = GET_STR(PG_GETARG_TEXT_P(0));
                                curname = GET_STR(PG_GETARG_TEXT_P(1));
                                howmany = PG_GETARG_INT32(2);
- 
-                               rcon = getConnectionByName(conname);
-                               if (rcon)
-                                       conn = rcon->con;
                        }
                }
!               else if (PG_NARGS() == 2)
                {
                        /* text,int */
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        howmany = PG_GETARG_INT32(1);
-                       conn = persistent_conn;
                }
  
!               if (!conn)
                        DBLINK_CONN_NOT_AVAIL;
  
                /* create a function context for cross-call persistence */
--- 467,501 ----
                        curname = GET_STR(PG_GETARG_TEXT_P(1));
                        howmany = PG_GETARG_INT32(2);
                        fail = PG_GETARG_BOOL(3);
                }
                else if (PG_NARGS() == 3)
                {
                        /* text,text,int or text,int,bool */
                        if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
                        {
+                               conname = EMPTY_CONNECTION_NAME;
                                curname = GET_STR(PG_GETARG_TEXT_P(0));
                                howmany = PG_GETARG_INT32(1);
                                fail = PG_GETARG_BOOL(2);
                        }
                        else
                        {
                                conname = GET_STR(PG_GETARG_TEXT_P(0));
                                curname = GET_STR(PG_GETARG_TEXT_P(1));
                                howmany = PG_GETARG_INT32(2);
                        }
                }
!               else
                {
                        /* text,int */
+                       Assert(PG_NARGS() == 2);
+                       conname = EMPTY_CONNECTION_NAME;
                        curname = GET_STR(PG_GETARG_TEXT_P(0));
                        howmany = PG_GETARG_INT32(1);
                }
  
!               rconn = getConnectionByName(conname);
!               if (!rconn || !rconn->conn)
                        DBLINK_CONN_NOT_AVAIL;
  
                /* create a function context for cross-call persistence */
***************
*** 515,521 ****
  
                appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
  
!               res = PQexec(conn, str->data);
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
                         PQresultStatus(res) != PGRES_TUPLES_OK))
--- 509,515 ----
  
                appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
  
!               res = PQexec(rconn->conn, str->data);
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
                         PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 638,657 ****
        int                     max_calls;
        AttInMetadata *attinmeta;
        char       *msg;
!       PGresult   *res = NULL;
        bool            is_sql_cmd = false;
        char       *sql_cmd_status = NULL;
        MemoryContext oldcontext;
!       bool            freeconn = false;
  
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
!               PGconn     *conn = NULL;
!               char       *connstr = NULL;
                char       *sql = NULL;
-               char       *conname = NULL;
-               remoteConn *rcon = NULL;
                bool            fail = true;    /* default to backward 
compatible */
  
                /* create a function context for cross-call persistence */
--- 632,649 ----
        int                     max_calls;
        AttInMetadata *attinmeta;
        char       *msg;
!       PGresult   *res;
        bool            is_sql_cmd = false;
        char       *sql_cmd_status = NULL;
        MemoryContext oldcontext;
!       bool            rconn_is_local = false;
  
        /* stuff done only on the first call of the function */
        if (SRF_IS_FIRSTCALL())
        {
!               char       *conname;
!               remoteConn *rconn;
                char       *sql = NULL;
                bool            fail = true;    /* default to backward 
compatible */
  
                /* create a function context for cross-call persistence */
***************
*** 666,704 ****
                if (PG_NARGS() == 3)
                {
                        /* text,text,bool */
!                       DBLINK_GET_CONN;
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
                        fail = PG_GETARG_BOOL(2);
                }
                else if (PG_NARGS() == 2)
                {
                        /* text,text or text,bool */
                        if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                        {
!                               conn = persistent_conn;
                                sql = GET_STR(PG_GETARG_TEXT_P(0));
                                fail = PG_GETARG_BOOL(1);
                        }
                        else
                        {
!                               DBLINK_GET_CONN;
                                sql = GET_STR(PG_GETARG_TEXT_P(1));
                        }
                }
!               else if (PG_NARGS() == 1)
                {
                        /* text */
!                       conn = persistent_conn;
                        sql = GET_STR(PG_GETARG_TEXT_P(0));
                }
-               else
-                       /* shouldn't happen */
-                       elog(ERROR, "wrong number of arguments");
  
!               if (!conn)
                        DBLINK_CONN_NOT_AVAIL;
  
!               res = PQexec(conn, sql);
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
                         PQresultStatus(res) != PGRES_TUPLES_OK))
--- 658,698 ----
                if (PG_NARGS() == 3)
                {
                        /* text,text,bool */
!                       conname = GET_STR(PG_GETARG_TEXT_P(0));
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
                        fail = PG_GETARG_BOOL(2);
+                       DBLINK_GET_CONN;
                }
                else if (PG_NARGS() == 2)
                {
                        /* text,text or text,bool */
                        if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                        {
!                               conname = EMPTY_CONNECTION_NAME;
!                               rconn = getConnectionByName(conname);
                                sql = GET_STR(PG_GETARG_TEXT_P(0));
                                fail = PG_GETARG_BOOL(1);
                        }
                        else
                        {
!                               conname = GET_STR(PG_GETARG_TEXT_P(0));
                                sql = GET_STR(PG_GETARG_TEXT_P(1));
+                               DBLINK_GET_CONN;
                        }
                }
!               else
                {
                        /* text */
!                       Assert(PG_NARGS() == 1);
!                       conname = EMPTY_CONNECTION_NAME;
!                       rconn = getConnectionByName(conname);
                        sql = GET_STR(PG_GETARG_TEXT_P(0));
                }
  
!               if (!rconn || !rconn->conn)
                        DBLINK_CONN_NOT_AVAIL;
  
!               res = PQexec(rconn->conn, sql);
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
                         PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 708,715 ****
                        else
                        {
                                DBLINK_RES_ERROR_AS_NOTICE("sql error");
!                               if (freeconn)
!                                       PQfinish(conn);
                                SRF_RETURN_DONE(funcctx);
                        }
                }
--- 702,712 ----
                        else
                        {
                                DBLINK_RES_ERROR_AS_NOTICE("sql error");
!                               if (rconn_is_local)
!                               {
!                                       PQfinish(rconn->conn);
!                                       pfree(rconn);
!                               }
                                SRF_RETURN_DONE(funcctx);
                        }
                }
***************
*** 736,744 ****
                /* got results, keep track of them */
                funcctx->user_fctx = res;
  
!               /* if needed, close the connection to the database and cleanup 
*/
!               if (freeconn)
!                       PQfinish(conn);
  
                /* fast track when no results */
                if (funcctx->max_calls < 1)
--- 733,743 ----
                /* got results, keep track of them */
                funcctx->user_fctx = res;
  
!               if (rconn_is_local)
!               {
!                       PQfinish(rconn->conn);
!                       pfree(rconn);
!               }
  
                /* fast track when no results */
                if (funcctx->max_calls < 1)
***************
*** 846,895 ****
        PGresult   *res = NULL;
        text       *sql_cmd_status = NULL;
        TupleDesc       tupdesc = NULL;
-       PGconn     *conn = NULL;
-       char       *connstr = NULL;
        char       *sql = NULL;
        char       *conname = NULL;
!       remoteConn *rcon = NULL;
!       bool            freeconn = false;
        bool            fail = true;    /* default to backward compatible 
behavior */
  
        if (PG_NARGS() == 3)
        {
                /* must be text,text,bool */
!               DBLINK_GET_CONN;
                sql = GET_STR(PG_GETARG_TEXT_P(1));
                fail = PG_GETARG_BOOL(2);
        }
        else if (PG_NARGS() == 2)
        {
                /* might be text,text or text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                {
!                       conn = persistent_conn;
                        sql = GET_STR(PG_GETARG_TEXT_P(0));
                        fail = PG_GETARG_BOOL(1);
                }
                else
                {
!                       DBLINK_GET_CONN;
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
                }
        }
!       else if (PG_NARGS() == 1)
        {
                /* must be single text argument */
!               conn = persistent_conn;
                sql = GET_STR(PG_GETARG_TEXT_P(0));
        }
-       else
-               /* shouldn't happen */
-               elog(ERROR, "wrong number of arguments");
  
!       if (!conn)
                DBLINK_CONN_NOT_AVAIL;
  
!       res = PQexec(conn, sql);
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
                 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 845,894 ----
        PGresult   *res = NULL;
        text       *sql_cmd_status = NULL;
        TupleDesc       tupdesc = NULL;
        char       *sql = NULL;
        char       *conname = NULL;
!       remoteConn *rconn = NULL;
!       bool            rconn_is_local = false;
        bool            fail = true;    /* default to backward compatible 
behavior */
  
        if (PG_NARGS() == 3)
        {
                /* must be text,text,bool */
!               conname = GET_STR(PG_GETARG_TEXT_P(0));
                sql = GET_STR(PG_GETARG_TEXT_P(1));
                fail = PG_GETARG_BOOL(2);
+               DBLINK_GET_CONN;
        }
        else if (PG_NARGS() == 2)
        {
                /* might be text,text or text,bool */
                if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                {
!                       conname = EMPTY_CONNECTION_NAME;
!                       rconn = getConnectionByName(conname);
                        sql = GET_STR(PG_GETARG_TEXT_P(0));
                        fail = PG_GETARG_BOOL(1);
                }
                else
                {
!                       conname = GET_STR(PG_GETARG_TEXT_P(0));
                        sql = GET_STR(PG_GETARG_TEXT_P(1));
+                       DBLINK_GET_CONN;
                }
        }
!       else
        {
                /* must be single text argument */
!               Assert(PG_NARGS() == 1);
!               conname = EMPTY_CONNECTION_NAME;
!               rconn = getConnectionByName(conname);
                sql = GET_STR(PG_GETARG_TEXT_P(0));
        }
  
!       if (!rconn || !rconn->conn)
                DBLINK_CONN_NOT_AVAIL;
  
!       res = PQexec(rconn->conn, sql);
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
                 PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 933,941 ****
                           errmsg("statement returning results not allowed")));
        }
  
!       /* if needed, close the connection to the database and cleanup */
!       if (freeconn)
!               PQfinish(conn);
  
        PG_RETURN_TEXT_P(sql_cmd_status);
  }
--- 932,942 ----
                           errmsg("statement returning results not allowed")));
        }
  
!       if (rconn_is_local)
!       {
!               PQfinish(rconn->conn);
!               pfree(rconn);
!       }
  
        PG_RETURN_TEXT_P(sql_cmd_status);
  }
***************
*** 1864,1870 ****
        char       *relname;
        TupleDesc       tupdesc;
        StringInfo      str = makeStringInfo();
!       char       *sql = NULL;
        int                     ret;
        HeapTuple       tuple;
        int                     i;
--- 1865,1871 ----
        char       *relname;
        TupleDesc       tupdesc;
        StringInfo      str = makeStringInfo();
!       char       *sql;
        int                     ret;
        HeapTuple       tuple;
        int                     i;
***************
*** 2022,2028 ****
                                                                                
           key, HASH_FIND, NULL);
  
        if (hentry)
!               return (hentry->rcon);
  
        return (NULL);
  }
--- 2023,2029 ----
                                                                                
           key, HASH_FIND, NULL);
  
        if (hentry)
!               return (hentry->rconn);
  
        return (NULL);
  }
***************
*** 2039,2045 ****
  }
  
  static void
! createNewConnection(const char *name, remoteConn * con)
  {
        remoteConnHashEnt *hentry;
        bool            found;
--- 2040,2046 ----
  }
  
  static void
! createNewConnection(const char *name, remoteConn *rconn)
  {
        remoteConnHashEnt *hentry;
        bool            found;
***************
*** 2058,2064 ****
                                (errcode(ERRCODE_DUPLICATE_OBJECT),
                                 errmsg("duplicate connection name")));
  
!       hentry->rcon = con;
        strncpy(hentry->name, name, NAMEDATALEN - 1);
  }
  
--- 2059,2065 ----
                                (errcode(ERRCODE_DUPLICATE_OBJECT),
                                 errmsg("duplicate connection name")));
  
!       hentry->rconn = rconn;
        strncpy(hentry->name, name, NAMEDATALEN - 1);
  }
  
---------------------------(end of broadcast)---------------------------
TIP 5: don't forget to increase your free space map settings

Reply via email to