The changes to the C module.

-- 
Patrick TJ McPhee <[email protected]>

Index: trunk/module/pgmodule.c
===================================================================
--- trunk/module/pgmodule.c     (revision 520)
+++ trunk/module/pgmodule.c     (working copy)
@@ -74,6 +74,7 @@
 #define RESULT_DML                     2
 #define RESULT_DDL                     3
 #define RESULT_DQL                     4
+#define RESULT_ASYNC           5
 
 /* flags for move methods */
 #define QUERY_MOVEFIRST                1
@@ -172,6 +173,7 @@
 typedef struct
 {
        PyObject_HEAD
+       pgobject        *pgcnx;                 /* for async results */
        PGresult        *result;                /* result content */
        int                     result_type;    /* type of previous result */
        long            current_pos;    /* current position in last result */
@@ -1685,7 +1687,7 @@
 
 /* connects to a database */
 static char connect__doc__[] =
-"connect(dbname, host, port, opt, tty) -- connect to a PostgreSQL database "
+"connect(dbname, host, port, opt, tty, user, password, nowait) -- connect to a 
PostgreSQL database "
 "using specified parameters (optionals, keywords aware).";
 
 static PyObject *
@@ -1692,7 +1694,7 @@
 pgconnect(pgobject *self, PyObject *args, PyObject *dict)
 {
        static const char *kwlist[] = {"dbname", "host", "port", "opt",
-       "tty", "user", "passwd", NULL};
+       "tty", "user", "passwd", "nowait", NULL};
 
        char       *pghost,
                           *pgopt,
@@ -1700,8 +1702,10 @@
                           *pgdbname,
                           *pguser,
                           *pgpasswd;
-       int                     pgport;
+       int                     pgport, nowait = 0, kwcount = 0;
        char            port_buffer[20];
+       const char *keywords[sizeof(kwlist)/sizeof(*kwlist)+1],
+                          *values[sizeof(kwlist)/sizeof(*kwlist)+1];
        pgobject   *npgobj;
 
        pghost = pgopt = pgtty = pgdbname = pguser = pgpasswd = NULL;
@@ -1713,8 +1717,8 @@
         * don't declare kwlist as const char *kwlist[] then it complains when
         * I try to assign all those constant strings to it.
         */
-       if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzz", (char **) 
kwlist,
-               &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, 
&pgpasswd))
+       if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzzi", (char **) 
kwlist,
+               &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, 
&pgpasswd, &nowait))
                return NULL;
 
 #ifdef DEFAULT_VARS
@@ -1753,8 +1757,59 @@
 #ifdef PQsetdbLoginIsThreadSafe
        Py_BEGIN_ALLOW_THREADS
 #endif
-       npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
-               pgopt, pgtty, pgdbname, pguser, pgpasswd);
+       if (!nowait)
+       {
+               npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : 
port_buffer,
+                       pgopt, pgtty, pgdbname, pguser, pgpasswd);
+       }
+       else
+       {
+               if (pghost)
+               {
+                       keywords[kwcount] = "host";
+                       values[kwcount] = pghost;
+                       kwcount++;
+               }
+               if (pgopt)
+               {
+                       keywords[kwcount] = "options";
+                       values[kwcount] = pgopt;
+                       kwcount++;
+               }
+               if (pgtty)
+               {
+                       /* postgres ignores this, so we do, too */
+               }
+               if (pgdbname)
+               {
+                       keywords[kwcount] = "dbname";
+                       values[kwcount] = pgdbname;
+                       kwcount++;
+               }
+               if (pguser)
+               {
+                       keywords[kwcount] = "user";
+                       values[kwcount] = pguser;
+                       kwcount++;
+               }
+               if (pgpasswd)
+               {
+                       keywords[kwcount] = "password";
+                       values[kwcount] = pgpasswd;
+                       kwcount++;
+               }
+               if (pgport != -1)
+               {
+                       keywords[kwcount] = "port";
+                       values[kwcount] = port_buffer;
+                       kwcount++;
+               }
+               keywords[kwcount] = values[kwcount] = NULL;
+
+               /* The final argument means dbname can be a full connection 
string,
+                * which is always the case for PQsetdbLogin() */
+               npgobj->cnx = PQconnectStartParams(keywords, values, 1);
+       }
 #ifdef PQsetdbLoginIsThreadSafe
        Py_END_ALLOW_THREADS
 #endif
@@ -1854,6 +1909,9 @@
        if (self->result)
                PQclear(self->result);
 
+       if (self->pgcnx)
+               Py_DECREF(self->pgcnx);
+
        PyObject_Del(self);
 }
 
@@ -1939,6 +1997,44 @@
 #endif
 }
 
+/* get asynchronous connection state */
+static char pg_connectpoll__doc__[] =
+"Initiates phases of the asynchronous connection process"
+" and returns the connection state.";
+
+static PyObject *
+pg_connectpoll(pgobject *self, PyObject *args)
+{
+       int rc;
+
+       if (!self->cnx)
+       {
+               PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+               return NULL;
+       }
+
+       /* checks args */
+       if (!PyArg_ParseTuple(args, ""))
+       {
+               PyErr_SetString(PyExc_TypeError,
+                       "method connectPoll() takes no parameters.");
+               return NULL;
+       }
+
+       Py_BEGIN_ALLOW_THREADS
+       rc =PQconnectPoll(self->cnx);
+       Py_END_ALLOW_THREADS
+
+       if (rc == PGRES_POLLING_FAILED)
+       {
+               set_dberror(InternalError, PQerrorMessage(self->cnx), NULL);
+               Py_XDECREF(self);
+               return NULL;
+       }
+
+       return PyInt_FromLong(rc);
+}
+
 /* set notice receiver callback function */
 static char pg_set_notice_receiver__doc__[] =
 "set_notice_receiver() -- set the current notice receiver.";
@@ -2098,6 +2194,56 @@
        return PyInt_FromLong(num);
 }
 
+
+/* for non-query results, sets the appropriate error status, returns
+ * the appropriate value, and frees the result set */
+static PyObject * _check_result_status(int status, PGconn * cnx, PGresult * 
result)
+{
+       switch (status)
+       {
+               case PGRES_EMPTY_QUERY:
+                       PyErr_SetString(PyExc_ValueError, "empty query.");
+                       break;
+               case PGRES_BAD_RESPONSE:
+               case PGRES_FATAL_ERROR:
+               case PGRES_NONFATAL_ERROR:
+                       set_dberror(ProgrammingError,
+                               PQerrorMessage(cnx), result);
+                       break;
+               case PGRES_COMMAND_OK:
+                       {                                               /* 
INSERT, UPDATE, DELETE */
+                               Oid             oid = PQoidValue(result);
+                               if (oid == InvalidOid)  /* not a single insert 
*/
+                               {
+                                       char    *ret = PQcmdTuples(result);
+                                               PQclear(result);
+                                       if (ret[0])             /* return 
number of rows affected */
+                                       {
+                                               return PyString_FromString(ret);
+                                       }
+                                       Py_INCREF(Py_None);
+                                       return Py_None;
+                               }
+                               /* for a single insert, return the oid */
+                               PQclear(result);
+                               return PyInt_FromLong(oid);
+                       }
+               case PGRES_COPY_OUT:            /* no data will be received */
+               case PGRES_COPY_IN:
+                       PQclear(result);
+                       Py_INCREF(Py_None);
+                       return Py_None;
+               default:
+                       set_dberror(InternalError,
+                               "internal error: unknown result status.", 
result);
+                       break;
+       }
+
+       PQclear(result);
+       return NULL;                    /* error detected on query */
+}
+
+
 /* retrieves last result */
 static char pgquery_getresult__doc__[] =
 "getresult() -- Gets the result of a query.  The result is returned "
@@ -2124,13 +2270,81 @@
                return NULL;
        }
 
+       /* this is following an async call, so need to get the result first */
+       if (self->result_type == RESULT_ASYNC)
+       {
+               int status;
+
+               if (!self->pgcnx)
+               {
+                       PyErr_SetString(PyExc_TypeError, "Connection is not 
valid.");
+                       return NULL;
+               }
+
+
+               Py_BEGIN_ALLOW_THREADS
+               if (self->result)
+               {
+                       PQclear(self->result);
+               }
+               self->result = PQgetResult(self->pgcnx->cnx);
+               Py_END_ALLOW_THREADS
+               /* end of result set, return None */
+               if (!self->result)
+               {
+                       Py_DECREF(self->pgcnx);
+                       self->pgcnx = NULL;
+                       Py_INCREF(Py_None);
+                       return Py_None;
+               }
+
+               if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+               {
+                       val = _check_result_status(status, self->pgcnx->cnx, 
self->result);
+                       /* _check_result_status calls PQclear() so we need to 
clear this
+                        * attribute */
+                       self->result = NULL;
+                       /* throwing an exception. Need to call PQgetResult() to 
clear the
+                        * connection state. This should return NULL the first 
time. */
+                       if (!val)
+                       {
+                               self->result = PQgetResult(self->pgcnx->cnx);
+                               while (self->result)
+                               {
+                                       PQclear(self->result);
+                                       self->result = 
PQgetResult(self->pgcnx->cnx);
+                                       Py_DECREF(self->pgcnx);
+                                       self->pgcnx = NULL;
+                               }
+                       }
+                       /* it's confusing to return None here because the 
caller has to
+                        * call again until we return None. We can't just 
consume that
+                        * final None because we don't know if there are 
additional
+                        * statements following this one, so we return an empty 
string where
+                        * query() would return None */
+                       else if (val == Py_None)
+                       {
+                               Py_DECREF(val);
+                               val = PyString_FromString("");
+                       }
+                       return val;
+               }
+       }
+
+
        /* stores result in tuple */
        m = PQntuples(self->result);
        n = PQnfields(self->result);
-       reslist = PyList_New(m);
 
        typ = get_type_array(self->result, n);
 
+       if (!typ)
+       {
+               return NULL;
+       }
+
+       reslist = PyList_New(m);
+
        for (i = 0; i < m; i++)
        {
                if (!(rowtuple = PyTuple_New(n)))
@@ -2251,6 +2465,59 @@
                return NULL;
        }
 
+       /* this is following an async call, so need to get the result first */
+       if (self->result_type == RESULT_ASYNC)
+       {
+               int status;
+
+               Py_BEGIN_ALLOW_THREADS
+               if (self->result)
+               {
+                       PQclear(self->result);
+               }
+               self->result = PQgetResult(self->pgcnx->cnx);
+               Py_END_ALLOW_THREADS
+               /* end of result set, return None */
+               if (!self->result)
+               {
+                       Py_INCREF(Py_None);
+                       return Py_None;
+               }
+
+               if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+               {
+                       val = _check_result_status(status, self->pgcnx->cnx, 
self->result);
+                       /* _check_result_status calls PQclear() so we need to 
clear this
+                        * attribute */
+                       self->result = NULL;
+                       /* throwing an exception. Need to call PQgetResult() to 
clear the
+                        * connection state. This should return NULL the first 
time. */
+                       if (!val)
+                       {
+                               self->result = PQgetResult(self->pgcnx->cnx);
+                               while (self->result)
+                               {
+                                       PQclear(self->result);
+                                       self->result = 
PQgetResult(self->pgcnx->cnx);
+                                       Py_DECREF(self->pgcnx);
+                                       self->pgcnx = NULL;
+                               }
+                       }
+                       /* it's confusing to return None here because the 
caller has to
+                        * call again until we return None. We can't just 
consume that
+                        * final None because we don't know if there are 
additional
+                        * statements following this one, so we return an empty 
string where
+                        * query() would return None */
+                       else if (val == Py_None)
+                       {
+                               Py_DECREF(val);
+                               val = PyString_FromString("");
+                       }
+                       return val;
+               }
+       }
+
+
        /* stores result in list */
        m = PQntuples(self->result);
        n = PQnfields(self->result);
@@ -2484,8 +2751,13 @@
 "query(sql, [args]) -- creates a new query object for this connection, using"
 " sql (string) request and optionally a tuple with positional parameters.";
 
+static char pg_sendquery__doc__[] =
+"sendquery(sql, [args]) -- creates a new query object for this connection, 
using"
+" sql (string) request and optionally a tuple with positional parameters. 
Returns"
+" without waiting for the query to complete.";
+
 static PyObject *
-pg_query(pgobject *self, PyObject *args)
+_pg_query(pgobject *self, PyObject *args, int async)
 {
        char            *query;
        PyObject        *oargs = NULL;
@@ -2608,8 +2880,17 @@
                }
 
                Py_BEGIN_ALLOW_THREADS
-               result = PQexecParams(self->cnx, query, nparms,
-                       NULL, (const char * const *)parms, lparms, NULL, 0);
+               if (async)
+               {
+                       status = PQsendQueryParams(self->cnx, query, nparms,
+                               NULL, (const char * const *)parms, lparms, 
NULL, 0);
+                       result = NULL;
+               }
+               else
+               {
+                       result = PQexecParams(self->cnx, query, nparms,
+                               NULL, (const char * const *)parms, lparms, 
NULL, 0);
+               }
                Py_END_ALLOW_THREADS
 
                free(lparms); free(parms);
@@ -2625,12 +2906,20 @@
        else
        {
                Py_BEGIN_ALLOW_THREADS
-               result = PQexec(self->cnx, query);
+               if (async)
+               {
+                       status = PQsendQuery(self->cnx, query);
+                       result = NULL;
+               }
+               else
+               {
+                       result = PQexec(self->cnx, query);
+               }
                Py_END_ALLOW_THREADS
        }
 
        /* checks result validity */
-       if (!result)
+       if ((!async && !result) || (async && !status))
        {
                PyErr_SetString(PyExc_ValueError, PQerrorMessage(self->cnx));
                return NULL;
@@ -2637,51 +2926,9 @@
        }
 
        /* checks result status */
-       if ((status = PQresultStatus(result)) != PGRES_TUPLES_OK)
+       if (!async && (status = PQresultStatus(result)) != PGRES_TUPLES_OK)
        {
-               switch (status)
-               {
-                       case PGRES_EMPTY_QUERY:
-                               PyErr_SetString(PyExc_ValueError, "empty 
query.");
-                               break;
-                       case PGRES_BAD_RESPONSE:
-                       case PGRES_FATAL_ERROR:
-                       case PGRES_NONFATAL_ERROR:
-                               set_dberror(ProgrammingError,
-                                       PQerrorMessage(self->cnx), result);
-                               break;
-                       case PGRES_COMMAND_OK:
-                               {                                               
/* INSERT, UPDATE, DELETE */
-                                       Oid             oid = 
PQoidValue(result);
-                                       if (oid == InvalidOid)  /* not a single 
insert */
-                                       {
-                                               char    *ret = 
PQcmdTuples(result);
-
-                                               PQclear(result);
-                                               if (ret[0])             /* 
return number of rows affected */
-                                               {
-                                                       return 
PyString_FromString(ret);
-                                               }
-                                               Py_INCREF(Py_None);
-                                               return Py_None;
-                                       }
-                                       /* for a single insert, return the oid 
*/
-                                       PQclear(result);
-                                       return PyInt_FromLong(oid);
-                               }
-                       case PGRES_COPY_OUT:            /* no data will be 
received */
-                       case PGRES_COPY_IN:
-                               PQclear(result);
-                               Py_INCREF(Py_None);
-                               return Py_None;
-                       default:
-                               set_dberror(InternalError,
-                                       "internal error: unknown result 
status.", result);
-                               break;
-               }
-
-               PQclear(result);
-               return NULL;                    /* error detected on query */
+               return _check_result_status(status, self->cnx, result);
        }
 
        if (!(npgobj = PyObject_NEW(pgqueryobject, &PgQueryType)))
@@ -2688,10 +2935,29 @@
                return NULL;
 
        /* stores result and returns object */
+       npgobj->pgcnx = NULL;
        npgobj->result = result;
+       if (async)
+       {
+               npgobj->result_type = RESULT_ASYNC;
+               Py_INCREF(self);
+               npgobj->pgcnx = self;
+       }
        return (PyObject *) npgobj;
 }
 
+static PyObject *
+pg_query(pgobject *self, PyObject *args, int nowait)
+{
+       return _pg_query(self, args, 0);
+}
+
+static PyObject *
+pg_sendquery(pgobject *self, PyObject *args, int nowait )
+{
+       return _pg_query(self, args, 1);
+}
+
 #ifdef DIRECT_ACCESS
 static char pg_putline__doc__[] =
 "putline() -- sends a line directly to the backend";
@@ -2798,6 +3064,74 @@
        Py_INCREF(Py_None);
        return Py_None;
 }
+
+
+/* direct access function : setnonblocking */
+static char pg_setnonblocking__doc__[] =
+"setnonblocking() -- puts direct copy functions in non-blocking mode";
+
+static PyObject *
+pg_setnonblocking(pgobject *self, PyObject *args)
+{
+       int nonblocking;
+
+       if (!self->cnx)
+       {
+               PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+               return NULL;
+       }
+
+       /* reads args */
+       if (!PyArg_ParseTuple(args, "i", &nonblocking))
+       {
+               PyErr_SetString(PyExc_TypeError, "setnonblocking(tf), with 
boolean.");
+               return NULL;
+       }
+
+       /* returns -1 on error, or 0 on success */
+       if (PQsetnonblocking(self->cnx, nonblocking) < 0)
+       {
+               PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+               return NULL;
+       }
+       Py_INCREF(Py_None);
+       return Py_None;
+}
+
+/* direct access function : isnonblocking */
+static char pg_isnonblocking__doc__[] =
+"isnonblocking() -- reports the non-blocking status of copy functions";
+
+static PyObject *
+pg_isnonblocking(pgobject *self, PyObject *args)
+{
+       int rc;
+
+       if (!self->cnx)
+       {
+               PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+               return NULL;
+       }
+
+       /* reads args */
+       if (!PyArg_ParseTuple(args, ""))
+       {
+               PyErr_SetString(PyExc_TypeError,
+                       "method isnonblocking() takes no parameters.");
+               return NULL;
+       }
+
+       /* returns 1 if blocking, 0 otherwise. not sure if it can return -1 */
+       rc = PQisnonblocking(self->cnx);
+       if (rc < 0)
+       {
+               PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+               return NULL;
+       }
+
+       return PyBool_FromLong(rc);
+}
+
 #endif /* DIRECT_ACCESS */
 
 static PyObject *
@@ -3301,9 +3635,12 @@
 static struct PyMethodDef pgobj_methods[] = {
        {"source", (PyCFunction) pg_source, METH_VARARGS, pg_source__doc__},
        {"query", (PyCFunction) pg_query, METH_VARARGS, pg_query__doc__},
+       {"sendquery", (PyCFunction) pg_sendquery, METH_VARARGS, 
pg_sendquery__doc__},
        {"reset", (PyCFunction) pg_reset, METH_VARARGS, pg_reset__doc__},
        {"cancel", (PyCFunction) pg_cancel, METH_VARARGS, pg_cancel__doc__},
        {"close", (PyCFunction) pg_close, METH_VARARGS, pg_close__doc__},
+       {"connectpoll", (PyCFunction) pg_connectpoll, METH_VARARGS,
+                       pg_connectpoll__doc__},
        {"fileno", (PyCFunction) pg_fileno, METH_VARARGS, pg_fileno__doc__},
        {"get_notice_receiver", (PyCFunction) pg_get_notice_receiver, 
METH_VARARGS,
                        pg_get_notice_receiver__doc__},
@@ -3333,6 +3670,8 @@
        {"putline", (PyCFunction) pg_putline, 1, pg_putline__doc__},
        {"getline", (PyCFunction) pg_getline, 1, pg_getline__doc__},
        {"endcopy", (PyCFunction) pg_endcopy, 1, pg_endcopy__doc__},
+       {"setnonblocking", (PyCFunction) pg_setnonblocking, 1, 
pg_setnonblocking__doc__},
+       {"isnonblocking", (PyCFunction) pg_isnonblocking, 1, 
pg_isnonblocking__doc__},
 #endif /* DIRECT_ACCESS */
 
 #ifdef LARGE_OBJECTS
@@ -4249,6 +4588,12 @@
        PyDict_SetItemString(dict, "SEEK_END", PyInt_FromLong(SEEK_END));
 #endif /* LARGE_OBJECTS */
 
+       /* PQconnectPoll() results */
+       
PyDict_SetItemString(dict,"PGRES_POLLING_OK",PyInt_FromLong(PGRES_POLLING_OK));
+       
PyDict_SetItemString(dict,"PGRES_POLLING_FAILED",PyInt_FromLong(PGRES_POLLING_FAILED));
+       
PyDict_SetItemString(dict,"PGRES_POLLING_READING",PyInt_FromLong(PGRES_POLLING_READING));
+       
PyDict_SetItemString(dict,"PGRES_POLLING_WRITING",PyInt_FromLong(PGRES_POLLING_WRITING));
+
 #ifdef DEFAULT_VARS
        /* prepares default values */
        Py_INCREF(Py_None);
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to