The attached patch provides the asynchronous operations described in
section 31.4 of the postgres manual. I believe everything described in
that section is available with these exceptions:
- there's no prepared statement support
- I didn't implement PQsetSingleRowMode(). This would require a
possibly small change to the way that query results are retrieved that I
thought would go better as a separate change set
- I didn't implement PQconsumeInput() or PQisBusy(). I don't really
understand the point of these functions, they seem to have marginal
utility outside notification reception, and I wasn't sure exactly how to
document them. It might make sense to have an isbusy() call which calls
both, but I don't really know if that fits anybody's use case
- I seem to have left out PQflush(). This is an oversight. In general,
the non-blocking operations are not well tested.
In general, the changes allow the database to be used in an event-driven
application, and for other applications, there are some parallelism
benefits:
- Connections can be completed in the background, which can speed up use
cases where for instance the application needs to connect to several
databases at once
- when multiple semi-colon-delimited queries are run in a single call,
the results to all the queries are returned
- the application can do other work while waiting for queries to
complete
- copy and large object operations can use non-blocking IO
Query operations work essentially the same way as they do now, except
all the result codes are no returned by getresult(), dictresult() or
namedresult(), cases where query() returns None, getresult() et al
return '', and you have to call getresult() et al until they return
None. Also, exceptions raised by bad queries are raised by getresult()
et al, not by the query function.
The result member of pgqueryobject is changed by each call to
getresult() et al, so you can't get the same query result twice when
using asynchronous calls, and functions which depend on the result
member don't work until after a call to getresult() et al.
Because of this last point, I had to reorganize _namedresult(). That's
the only python change other than the unit test.
C code changes are:
- some new functions
- change to connect() to take a new argument and call
PQconnectStartParams() when appropriate
- added code to getresult() and dictresult() to call PQgetResult when
appropriate. Looking at it now, this block of code has got to be quite
big and maybe should move to its own function
- renamed pg_query to _pg_query and added a new argument. This is
called from wrapper functions pg_query() and pg_sendquery()
- moved scalar result processing from pg_query to a new function,
_check_result_status()
- changed pg_query to call PQsendQuery() or PQsendQueryParams() when
appropriate
--
Patrick TJ McPhee <[email protected]>
Index: trunk/docs/pg.txt
===================================================================
--- trunk/docs/pg.txt (revision 520)
+++ trunk/docs/pg.txt (working copy)
@@ -59,7 +59,7 @@
-------------------------------
Syntax::
- connect([dbname], [host], [port], [opt], [tty], [user], [passwd])
+ connect([dbname], [host], [port], [opt], [tty], [user], [passwd], [nowait])
Parameters:
:dbname: name of connected database (string/None)
@@ -69,6 +69,7 @@
:tty: debug terminal (string/None)
:user: PostgreSQL user (string/None)
:passwd: password for user (string/None)
+ :nowait: True to perform the connection asynchronously
Return type:
:pgobject: If successful, the `pgobject` handling the connection
@@ -86,6 +87,7 @@
Python tutorial. The names of the keywords are the name of the
parameters given in the syntax line. For a precise description
of the parameters, please refer to the PostgreSQL user manual.
+ See connectpoll() for a description of the nowait parameter.
Examples::
@@ -480,6 +482,95 @@
phone = con.query("select phone from employees"
" where name=$1", (name, )).getresult()
+sendquery - executes a SQL command string asynchronously
+--------------------------------------------------------
+Syntax::
+
+ sendquery(command, [args])
+
+Parameters:
+ :command: SQL command (string)
+ :args: optional positional arguments
+
+Return type:
+ :pgqueryobject
+
+Exceptions raised:
+ :TypeError: bad argument type, or too many arguments
+ :TypeError: invalid connection
+
+Description:
+ `sendquery()` is much the same as `query()`, except that it returns without
+ waiting for the query to complete. The database connection cannot be used
+ for other operations until the query completes, but the application can
+ do other things, including executing queries using other database connections.
+ The application can call `select()` using the connection's `fileno()` to
+ determine when the query has results to return.
+
+ `sendquery()` always returns a `pgqueryobject`. This object differs from
+ the `pgqueryobject` returned by `query()` in a few ways. Most importantly,
+ when `sendquery()` is used, the application must call one of the
+ result-returning methods (`getresult()`, `dictresult()`, or `namedresult()`)
+ on the `pgqueryobject` until it either throws an exception or returns `None`.
+ Otherwise, the database connection will be left in an unusable state.
+
+ In cases when `query()` would return something other than a `pgqueryobject`,
+ that result will be returned by calling one of the result-returning methods
+ on the `pgqueryobject` returned by `sendquery()`. There's one important
+ difference in these result codes: if `query()` returns `None`, the
+ result-returning methods will return an empty string (`''`). It's still
+ necessary to call a result-returning function until it returns `None`.
+
+ `listfields()`, `fieldname()`, `fieldnum()`, and `ntuples()` only work
+ after a call to a result-returning method with a non-`None` return value.
+ `ntuples()` returns only the number of rows returned by the previous
+ result-returning function.
+
+ If multiple semi-colon-delimited statements are passed to `query()`, only
+ the results of the last statement are returned in the `pgqueryobject`. With
+ `sendquery()`, all results are returned. Each result set will be
+ returned by a separate call to `getresult()`.
+
+Example::
+
+ name = raw_input("Name? ")
+ pgq = con.sendquery("select phone from employees"
+ " where name=$1", (name, ))
+ phone = pgq.getresult()
+ pgq.getresult() # to close the query
+
+ # initiate two queries in one round trip
+ # note this differs from a union since the result sets have different
+ # structures
+ pgq = con.sendquery("select a,b,c from x where d=e; select e,f from y where g")
+ qrabc = pgq.dictresult() # results from x
+ qref = pgq.dictresult() # results from y
+ pgq.dictresult() # to close the query
+
+ # using select to wait for the query to be ready
+ pgq = con.sendquery("select pg_sleep(20)")
+ r,w,e = select([con.fileno(),other,sockets],[],[])
+ if con.fileno() in r:
+ results = pgq.getresult()
+ pgq.getresult() # to close the query
+
+ # concurrent queries on separate connections
+ con1 = connect()
+ con2 = connect()
+ ss = con1.query("begin; set transaction isolation level repeatable read;"
+ "select pg_export_snapshot();").getresult()[0][0]
+ con2.query("begin; set transaction isolation level repeatable read;"
+ "set transaction snapshot '%s'" % (ss,))
+ pgq1 = con1.sendquery("select a,b,c from x where d=e")
+ pgq2 = con2.sendquery("select e,f from y where g")
+ qr1 = pgq1.getresult()
+ pgq1.getresult()
+ qr2 = pgq2.getresult()
+ pgq2.getresult()
+ con1.query("commit")
+ con2.query("commit")
+
+
reset - resets the connection
-----------------------------
Syntax::
@@ -540,6 +631,59 @@
allows you to explicitly close it. It is mainly here to allow
the DB-SIG API wrapper to implement a close function.
+connectpoll - completes an asynchronous connection
+--------------------------------------------------
+Syntax::
+
+ connectpoll()
+
+Parameters:
+ None
+
+Return type:
+ :int: PGRES_POLLING_OK, PGRES_POLLING_FAILED, PGRES_POLLING_READING or
+ PGRES_POLLING_WRITING
+
+Exceptions raised:
+ :TypeError: too many (any) arguments
+ :TypeError: invalid connection
+ :pg.InternalError: some error occurred during pg connection
+
+Description:
+ The database connection can be performed without any blocking
+ calls. This allows the application mainline to perform other
+ operations or perhaps connect to multiple databases concurrently.
+ Once the connecton is established, it's no different from a connection
+ made using blocking calls.
+
+ The required steps are to pass the parameter "nowait=True" to
+ the `connect()` call, then call `connectpoll()` until it either returns
+ `PGRES_POLLING_OK` or raises an exception. To avoid blocking in
+ `connectpoll()`, use `select()` or `poll()` to wait for the connection
+ to be readable or writable, depending on the return code of the
+ previous call to `connectpoll()`. The initial state is
+ `PGRES_POLLING_WRITING`.
+
+Example::
+
+ con = pg.connect('testdb', nowait=True)
+ conno = con.fileno()
+ rd = []
+ wt = [conno]
+ rc = pg.PGRES_POLLING_WRITING
+ while rc not in (pg.PGRES_POLLING_OK,pg.PGRES_POLLING_FAILED):
+ ra,wa,xa = select(rd, wt, [], timeout)
+ if not ra and not wa:
+ timedout()
+
+ rc = con.connectpoll()
+ if rc == pg.PGRES_POLLING_READING:
+ rd = [conno]
+ wt = []
+ else:
+ rd = []
+ wt = [conno]
+
fileno - returns the socket used to connect to the database
-----------------------------------------------------------
Syntax::
@@ -719,6 +863,47 @@
The use of direct access methods may desynchonize client and server.
This method ensure that client and server will be synchronized.
+setnonblocking - puts the connection into non-blocking mode
+-----------------------------------------------------------
+Syntax::
+
+ setnonblocking(nb)
+
+Parameters:
+ :nb: True to put the connection into non-blocking mode. False to put
+ it into blocking mode
+
+Return type:
+ None
+
+Exceptions raised:
+ :TypeError: invalid connection
+ :TypeError: too many parameters
+
+Description:
+ Puts the socket connection into non-blocking mode (or into blocking
+ mode). This affects copy commands and large object operations, but not
+ queries.
+
+isnonblocking - reports the connection's blocking status
+--------------------------------------------------------
+Syntax::
+
+ isnonblocking()
+
+Parameters:
+ None
+
+Return type:
+ :boolean: True if the connection is in non-blocking mode, False otherwise
+
+Exceptions raised:
+ :TypeError: invalid connection
+ :TypeError: too many parameters
+
+Description:
+ Returns True if the connection is in non-blocking mode. False otherwise.
+
locreate - create a large object in the database [LO]
-----------------------------------------------------
Syntax::
@@ -1158,6 +1343,8 @@
Return type:
:list: result values as a list of tuples
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
@@ -1179,6 +1366,8 @@
Return type:
:list: result values as a list of dictionaries
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
@@ -1200,6 +1389,8 @@
Return type:
:list: result values as a list of named tuples
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
Index: trunk/module/pg.py
===================================================================
--- trunk/module/pg.py (revision 520)
+++ trunk/module/pg.py (working copy)
@@ -121,8 +121,11 @@
def _namedresult(q):
"""Get query result as named tuples."""
+ # need to call this before listfields for async queries to work
+ res = q.getresult()
+ if not isinstance(res, list): return res
row = namedtuple('Row', q.listfields())
- return [row(*r) for r in q.getresult()]
+ return [row(*r) for r in res]
set_namedresult(_namedresult)
Index: trunk/module/pgmodule.c
===================================================================
--- trunk/module/pgmodule.c (revision 520)
+++ trunk/module/pgmodule.c (working copy)
@@ -74,6 +74,7 @@
#define RESULT_DML 2
#define RESULT_DDL 3
#define RESULT_DQL 4
+#define RESULT_ASYNC 5
/* flags for move methods */
#define QUERY_MOVEFIRST 1
@@ -172,6 +173,7 @@
typedef struct
{
PyObject_HEAD
+ pgobject *pgcnx; /* for async results */
PGresult *result; /* result content */
int result_type; /* type of previous result */
long current_pos; /* current position in last result */
@@ -1685,7 +1687,7 @@
/* connects to a database */
static char connect__doc__[] =
-"connect(dbname, host, port, opt, tty) -- connect to a PostgreSQL database "
+"connect(dbname, host, port, opt, tty, user, password, nowait) -- connect to a PostgreSQL database "
"using specified parameters (optionals, keywords aware).";
static PyObject *
@@ -1692,7 +1694,7 @@
pgconnect(pgobject *self, PyObject *args, PyObject *dict)
{
static const char *kwlist[] = {"dbname", "host", "port", "opt",
- "tty", "user", "passwd", NULL};
+ "tty", "user", "passwd", "nowait", NULL};
char *pghost,
*pgopt,
@@ -1700,8 +1702,10 @@
*pgdbname,
*pguser,
*pgpasswd;
- int pgport;
+ int pgport, nowait = 0, kwcount = 0;
char port_buffer[20];
+ const char *keywords[sizeof(kwlist)/sizeof(*kwlist)+1],
+ *values[sizeof(kwlist)/sizeof(*kwlist)+1];
pgobject *npgobj;
pghost = pgopt = pgtty = pgdbname = pguser = pgpasswd = NULL;
@@ -1713,8 +1717,8 @@
* don't declare kwlist as const char *kwlist[] then it complains when
* I try to assign all those constant strings to it.
*/
- if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzz", (char **) kwlist,
- &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, &pgpasswd))
+ if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzzi", (char **) kwlist,
+ &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, &pgpasswd, &nowait))
return NULL;
#ifdef DEFAULT_VARS
@@ -1753,8 +1757,59 @@
#ifdef PQsetdbLoginIsThreadSafe
Py_BEGIN_ALLOW_THREADS
#endif
- npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
- pgopt, pgtty, pgdbname, pguser, pgpasswd);
+ if (!nowait)
+ {
+ npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
+ pgopt, pgtty, pgdbname, pguser, pgpasswd);
+ }
+ else
+ {
+ if (pghost)
+ {
+ keywords[kwcount] = "host";
+ values[kwcount] = pghost;
+ kwcount++;
+ }
+ if (pgopt)
+ {
+ keywords[kwcount] = "options";
+ values[kwcount] = pgopt;
+ kwcount++;
+ }
+ if (pgtty)
+ {
+ /* postgres ignores this, so we do, too */
+ }
+ if (pgdbname)
+ {
+ keywords[kwcount] = "dbname";
+ values[kwcount] = pgdbname;
+ kwcount++;
+ }
+ if (pguser)
+ {
+ keywords[kwcount] = "user";
+ values[kwcount] = pguser;
+ kwcount++;
+ }
+ if (pgpasswd)
+ {
+ keywords[kwcount] = "password";
+ values[kwcount] = pgpasswd;
+ kwcount++;
+ }
+ if (pgport != -1)
+ {
+ keywords[kwcount] = "port";
+ values[kwcount] = port_buffer;
+ kwcount++;
+ }
+ keywords[kwcount] = values[kwcount] = NULL;
+
+ /* The final argument means dbname can be a full connection string,
+ * which is always the case for PQsetdbLogin() */
+ npgobj->cnx = PQconnectStartParams(keywords, values, 1);
+ }
#ifdef PQsetdbLoginIsThreadSafe
Py_END_ALLOW_THREADS
#endif
@@ -1854,6 +1909,9 @@
if (self->result)
PQclear(self->result);
+ if (self->pgcnx)
+ Py_DECREF(self->pgcnx);
+
PyObject_Del(self);
}
@@ -1939,6 +1997,44 @@
#endif
}
+/* get asynchronous connection state */
+static char pg_connectpoll__doc__[] =
+"Initiates phases of the asynchronous connection process"
+" and returns the connection state.";
+
+static PyObject *
+pg_connectpoll(pgobject *self, PyObject *args)
+{
+ int rc;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* checks args */
+ if (!PyArg_ParseTuple(args, ""))
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "method connectPoll() takes no parameters.");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ rc =PQconnectPoll(self->cnx);
+ Py_END_ALLOW_THREADS
+
+ if (rc == PGRES_POLLING_FAILED)
+ {
+ set_dberror(InternalError, PQerrorMessage(self->cnx), NULL);
+ Py_XDECREF(self);
+ return NULL;
+ }
+
+ return PyInt_FromLong(rc);
+}
+
/* set notice receiver callback function */
static char pg_set_notice_receiver__doc__[] =
"set_notice_receiver() -- set the current notice receiver.";
@@ -2098,6 +2194,56 @@
return PyInt_FromLong(num);
}
+
+/* for non-query results, sets the appropriate error status, returns
+ * the appropriate value, and frees the result set */
+static PyObject * _check_result_status(int status, PGconn * cnx, PGresult * result)
+{
+ switch (status)
+ {
+ case PGRES_EMPTY_QUERY:
+ PyErr_SetString(PyExc_ValueError, "empty query.");
+ break;
+ case PGRES_BAD_RESPONSE:
+ case PGRES_FATAL_ERROR:
+ case PGRES_NONFATAL_ERROR:
+ set_dberror(ProgrammingError,
+ PQerrorMessage(cnx), result);
+ break;
+ case PGRES_COMMAND_OK:
+ { /* INSERT, UPDATE, DELETE */
+ Oid oid = PQoidValue(result);
+ if (oid == InvalidOid) /* not a single insert */
+ {
+ char *ret = PQcmdTuples(result);
+ PQclear(result);
+ if (ret[0]) /* return number of rows affected */
+ {
+ return PyString_FromString(ret);
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+ /* for a single insert, return the oid */
+ PQclear(result);
+ return PyInt_FromLong(oid);
+ }
+ case PGRES_COPY_OUT: /* no data will be received */
+ case PGRES_COPY_IN:
+ PQclear(result);
+ Py_INCREF(Py_None);
+ return Py_None;
+ default:
+ set_dberror(InternalError,
+ "internal error: unknown result status.", result);
+ break;
+ }
+
+ PQclear(result);
+ return NULL; /* error detected on query */
+}
+
+
/* retrieves last result */
static char pgquery_getresult__doc__[] =
"getresult() -- Gets the result of a query. The result is returned "
@@ -2124,13 +2270,81 @@
return NULL;
}
+ /* this is following an async call, so need to get the result first */
+ if (self->result_type == RESULT_ASYNC)
+ {
+ int status;
+
+ if (!self->pgcnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+
+ Py_BEGIN_ALLOW_THREADS
+ if (self->result)
+ {
+ PQclear(self->result);
+ }
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS
+ /* end of result set, return None */
+ if (!self->result)
+ {
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+ {
+ val = _check_result_status(status, self->pgcnx->cnx, self->result);
+ /* _check_result_status calls PQclear() so we need to clear this
+ * attribute */
+ self->result = NULL;
+ /* throwing an exception. Need to call PQgetResult() to clear the
+ * connection state. This should return NULL the first time. */
+ if (!val)
+ {
+ self->result = PQgetResult(self->pgcnx->cnx);
+ while (self->result)
+ {
+ PQclear(self->result);
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ }
+ }
+ /* it's confusing to return None here because the caller has to
+ * call again until we return None. We can't just consume that
+ * final None because we don't know if there are additional
+ * statements following this one, so we return an empty string where
+ * query() would return None */
+ else if (val == Py_None)
+ {
+ Py_DECREF(val);
+ val = PyString_FromString("");
+ }
+ return val;
+ }
+ }
+
+
/* stores result in tuple */
m = PQntuples(self->result);
n = PQnfields(self->result);
- reslist = PyList_New(m);
typ = get_type_array(self->result, n);
+ if (!typ)
+ {
+ return NULL;
+ }
+
+ reslist = PyList_New(m);
+
for (i = 0; i < m; i++)
{
if (!(rowtuple = PyTuple_New(n)))
@@ -2251,6 +2465,59 @@
return NULL;
}
+ /* this is following an async call, so need to get the result first */
+ if (self->result_type == RESULT_ASYNC)
+ {
+ int status;
+
+ Py_BEGIN_ALLOW_THREADS
+ if (self->result)
+ {
+ PQclear(self->result);
+ }
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS
+ /* end of result set, return None */
+ if (!self->result)
+ {
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+ {
+ val = _check_result_status(status, self->pgcnx->cnx, self->result);
+ /* _check_result_status calls PQclear() so we need to clear this
+ * attribute */
+ self->result = NULL;
+ /* throwing an exception. Need to call PQgetResult() to clear the
+ * connection state. This should return NULL the first time. */
+ if (!val)
+ {
+ self->result = PQgetResult(self->pgcnx->cnx);
+ while (self->result)
+ {
+ PQclear(self->result);
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ }
+ }
+ /* it's confusing to return None here because the caller has to
+ * call again until we return None. We can't just consume that
+ * final None because we don't know if there are additional
+ * statements following this one, so we return an empty string where
+ * query() would return None */
+ else if (val == Py_None)
+ {
+ Py_DECREF(val);
+ val = PyString_FromString("");
+ }
+ return val;
+ }
+ }
+
+
/* stores result in list */
m = PQntuples(self->result);
n = PQnfields(self->result);
@@ -2484,8 +2751,13 @@
"query(sql, [args]) -- creates a new query object for this connection, using"
" sql (string) request and optionally a tuple with positional parameters.";
+static char pg_sendquery__doc__[] =
+"sendquery(sql, [args]) -- creates a new query object for this connection, using"
+" sql (string) request and optionally a tuple with positional parameters. Returns"
+" without waiting for the query to complete.";
+
static PyObject *
-pg_query(pgobject *self, PyObject *args)
+_pg_query(pgobject *self, PyObject *args, int async)
{
char *query;
PyObject *oargs = NULL;
@@ -2608,8 +2880,17 @@
}
Py_BEGIN_ALLOW_THREADS
- result = PQexecParams(self->cnx, query, nparms,
- NULL, (const char * const *)parms, lparms, NULL, 0);
+ if (async)
+ {
+ status = PQsendQueryParams(self->cnx, query, nparms,
+ NULL, (const char * const *)parms, lparms, NULL, 0);
+ result = NULL;
+ }
+ else
+ {
+ result = PQexecParams(self->cnx, query, nparms,
+ NULL, (const char * const *)parms, lparms, NULL, 0);
+ }
Py_END_ALLOW_THREADS
free(lparms); free(parms);
@@ -2625,12 +2906,20 @@
else
{
Py_BEGIN_ALLOW_THREADS
- result = PQexec(self->cnx, query);
+ if (async)
+ {
+ status = PQsendQuery(self->cnx, query);
+ result = NULL;
+ }
+ else
+ {
+ result = PQexec(self->cnx, query);
+ }
Py_END_ALLOW_THREADS
}
/* checks result validity */
- if (!result)
+ if ((!async && !result) || (async && !status))
{
PyErr_SetString(PyExc_ValueError, PQerrorMessage(self->cnx));
return NULL;
@@ -2637,51 +2926,9 @@
}
/* checks result status */
- if ((status = PQresultStatus(result)) != PGRES_TUPLES_OK)
+ if (!async && (status = PQresultStatus(result)) != PGRES_TUPLES_OK)
{
- switch (status)
- {
- case PGRES_EMPTY_QUERY:
- PyErr_SetString(PyExc_ValueError, "empty query.");
- break;
- case PGRES_BAD_RESPONSE:
- case PGRES_FATAL_ERROR:
- case PGRES_NONFATAL_ERROR:
- set_dberror(ProgrammingError,
- PQerrorMessage(self->cnx), result);
- break;
- case PGRES_COMMAND_OK:
- { /* INSERT, UPDATE, DELETE */
- Oid oid = PQoidValue(result);
- if (oid == InvalidOid) /* not a single insert */
- {
- char *ret = PQcmdTuples(result);
-
- PQclear(result);
- if (ret[0]) /* return number of rows affected */
- {
- return PyString_FromString(ret);
- }
- Py_INCREF(Py_None);
- return Py_None;
- }
- /* for a single insert, return the oid */
- PQclear(result);
- return PyInt_FromLong(oid);
- }
- case PGRES_COPY_OUT: /* no data will be received */
- case PGRES_COPY_IN:
- PQclear(result);
- Py_INCREF(Py_None);
- return Py_None;
- default:
- set_dberror(InternalError,
- "internal error: unknown result status.", result);
- break;
- }
-
- PQclear(result);
- return NULL; /* error detected on query */
+ return _check_result_status(status, self->cnx, result);
}
if (!(npgobj = PyObject_NEW(pgqueryobject, &PgQueryType)))
@@ -2688,10 +2935,29 @@
return NULL;
/* stores result and returns object */
+ npgobj->pgcnx = NULL;
npgobj->result = result;
+ if (async)
+ {
+ npgobj->result_type = RESULT_ASYNC;
+ Py_INCREF(self);
+ npgobj->pgcnx = self;
+ }
return (PyObject *) npgobj;
}
+static PyObject *
+pg_query(pgobject *self, PyObject *args, int nowait)
+{
+ return _pg_query(self, args, 0);
+}
+
+static PyObject *
+pg_sendquery(pgobject *self, PyObject *args, int nowait )
+{
+ return _pg_query(self, args, 1);
+}
+
#ifdef DIRECT_ACCESS
static char pg_putline__doc__[] =
"putline() -- sends a line directly to the backend";
@@ -2798,6 +3064,74 @@
Py_INCREF(Py_None);
return Py_None;
}
+
+
+/* direct access function : setnonblocking */
+static char pg_setnonblocking__doc__[] =
+"setnonblocking() -- puts direct copy functions in non-blocking mode";
+
+static PyObject *
+pg_setnonblocking(pgobject *self, PyObject *args)
+{
+ int nonblocking;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* reads args */
+ if (!PyArg_ParseTuple(args, "i", &nonblocking))
+ {
+ PyErr_SetString(PyExc_TypeError, "setnonblocking(tf), with boolean.");
+ return NULL;
+ }
+
+ /* returns -1 on error, or 0 on success */
+ if (PQsetnonblocking(self->cnx, nonblocking) < 0)
+ {
+ PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+ return NULL;
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+/* direct access function : isnonblocking */
+static char pg_isnonblocking__doc__[] =
+"isnonblocking() -- reports the non-blocking status of copy functions";
+
+static PyObject *
+pg_isnonblocking(pgobject *self, PyObject *args)
+{
+ int rc;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* reads args */
+ if (!PyArg_ParseTuple(args, ""))
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "method isnonblocking() takes no parameters.");
+ return NULL;
+ }
+
+ /* returns 1 if blocking, 0 otherwise. not sure if it can return -1 */
+ rc = PQisnonblocking(self->cnx);
+ if (rc < 0)
+ {
+ PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+ return NULL;
+ }
+
+ return PyBool_FromLong(rc);
+}
+
#endif /* DIRECT_ACCESS */
static PyObject *
@@ -3301,9 +3635,12 @@
static struct PyMethodDef pgobj_methods[] = {
{"source", (PyCFunction) pg_source, METH_VARARGS, pg_source__doc__},
{"query", (PyCFunction) pg_query, METH_VARARGS, pg_query__doc__},
+ {"sendquery", (PyCFunction) pg_sendquery, METH_VARARGS, pg_sendquery__doc__},
{"reset", (PyCFunction) pg_reset, METH_VARARGS, pg_reset__doc__},
{"cancel", (PyCFunction) pg_cancel, METH_VARARGS, pg_cancel__doc__},
{"close", (PyCFunction) pg_close, METH_VARARGS, pg_close__doc__},
+ {"connectpoll", (PyCFunction) pg_connectpoll, METH_VARARGS,
+ pg_connectpoll__doc__},
{"fileno", (PyCFunction) pg_fileno, METH_VARARGS, pg_fileno__doc__},
{"get_notice_receiver", (PyCFunction) pg_get_notice_receiver, METH_VARARGS,
pg_get_notice_receiver__doc__},
@@ -3333,6 +3670,8 @@
{"putline", (PyCFunction) pg_putline, 1, pg_putline__doc__},
{"getline", (PyCFunction) pg_getline, 1, pg_getline__doc__},
{"endcopy", (PyCFunction) pg_endcopy, 1, pg_endcopy__doc__},
+ {"setnonblocking", (PyCFunction) pg_setnonblocking, 1, pg_setnonblocking__doc__},
+ {"isnonblocking", (PyCFunction) pg_isnonblocking, 1, pg_isnonblocking__doc__},
#endif /* DIRECT_ACCESS */
#ifdef LARGE_OBJECTS
@@ -4249,6 +4588,12 @@
PyDict_SetItemString(dict, "SEEK_END", PyInt_FromLong(SEEK_END));
#endif /* LARGE_OBJECTS */
+ /* PQconnectPoll() results */
+ PyDict_SetItemString(dict,"PGRES_POLLING_OK",PyInt_FromLong(PGRES_POLLING_OK));
+ PyDict_SetItemString(dict,"PGRES_POLLING_FAILED",PyInt_FromLong(PGRES_POLLING_FAILED));
+ PyDict_SetItemString(dict,"PGRES_POLLING_READING",PyInt_FromLong(PGRES_POLLING_READING));
+ PyDict_SetItemString(dict,"PGRES_POLLING_WRITING",PyInt_FromLong(PGRES_POLLING_WRITING));
+
#ifdef DEFAULT_VARS
/* prepares default values */
Py_INCREF(Py_None);
Index: trunk/module/test_pg.py
===================================================================
--- trunk/module/test_pg.py (revision 520)
+++ trunk/module/test_pg.py (working copy)
@@ -364,15 +364,23 @@
self.assertEqual(attributes, connection_attributes)
def testAllConnectMethods(self):
- methods = '''cancel close endcopy
+ methods = '''cancel close connectpoll endcopy
escape_bytea escape_identifier escape_literal escape_string
fileno get_notice_receiver getline getlo getnotify
- inserttable locreate loimport parameter putline query reset
- set_notice_receiver source transaction'''.split()
+ inserttable isnonblocking locreate loimport parameter putline query reset
+ sendquery set_notice_receiver setnonblocking source transaction'''.split()
connection_methods = [a for a in dir(self.connection)
if callable(eval("self.connection." + a))]
self.assertEqual(methods, connection_methods)
+ def testAsyncConnect(self):
+ self.connection.close()
+ self.connection = pg.connect(nowait=True)
+ rc = self.connection.connectpoll()
+ while rc not in (pg.PGRES_POLLING_OK, pg.PGRES_POLLING_FAILED):
+ rc = self.connection.connectpoll()
+ self.assertEqual(rc, pg.PGRES_POLLING_OK)
+
def testAttributeDb(self):
self.assertEqual(self.connection.db, self.dbname)
@@ -425,6 +433,28 @@
def testMethodQueryEmpty(self):
self.assertRaises(ValueError, self.connection.query, '')
+ def testMethodSendQuery(self):
+ pgq = self.connection.sendquery("select 1+1")
+ self.assertEqual(pgq.getresult()[0][0], 2)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+$1", (1,))
+ self.assertEqual(pgq.getresult()[0][0], 2)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+$1+$2", (2, 3))
+ self.assertEqual(pgq.getresult()[0][0], 6)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+$1+$2", [2, 3])
+ self.assertEqual(pgq.getresult()[0][0], 6)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+1; select 'pg';")
+ self.assertEqual(pgq.getresult()[0][0], 2)
+ self.assertEqual(pgq.getresult()[0][0], 'pg')
+ self.assertIsNone(pgq.getresult())
+
+ def testMethodSendQueryEmpty(self):
+ pgq = self.connection.sendquery('')
+ self.assertRaises(ValueError, pgq.getresult)
+
def testMethodEndcopy(self):
try:
self.connection.endcopy()
@@ -478,6 +508,10 @@
result = [(0,)]
r = self.c.query(q).getresult()
self.assertEqual(r, result)
+ pgq = self.c.sendquery(q)
+ r = pgq.getresult()
+ self.assertEqual(r, result)
+ self.assertIsNone(pgq.getresult())
def testDictresult(self):
q = "select 0 as alias0"
@@ -484,6 +518,10 @@
result = [{'alias0': 0}]
r = self.c.query(q).dictresult()
self.assertEqual(r, result)
+ pgq = self.c.sendquery(q)
+ r = pgq.dictresult()
+ self.assertEqual(r, result)
+ self.assertIsNone(pgq.dictresult())
def testNamedresult(self):
if namedtuple:
@@ -494,6 +532,13 @@
v = r[0]
self.assertEqual(v._fields, ('alias0',))
self.assertEqual(v.alias0, 0)
+ pgq = self.c.sendquery(q)
+ r = pgq.namedresult()
+ self.assertEqual(r, result)
+ v = r[0]
+ self.assertEqual(v._fields, ('alias0',))
+ self.assertEqual(v.alias0, 0)
+ self.assertIsNone(pgq.namedresult())
def testGet3Cols(self):
q = "select 1,2,3"
@@ -970,6 +1015,7 @@
'clear',
'close',
'commit',
+ 'connectpoll',
'db',
'dbname',
'debug',
@@ -995,6 +1041,7 @@
'host',
'insert',
'inserttable',
+ 'isnonblocking',
'locreate',
'loimport',
'notification_handler',
@@ -1010,8 +1057,10 @@
'reset',
'rollback',
'savepoint',
+ 'sendquery',
'server_version',
'set_notice_receiver',
+ 'setnonblocking',
'source',
'start',
'status',
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql