The changes to the C module.
--
Patrick TJ McPhee <[email protected]>
Index: trunk/module/pgmodule.c
===================================================================
--- trunk/module/pgmodule.c (revision 520)
+++ trunk/module/pgmodule.c (working copy)
@@ -74,6 +74,7 @@
#define RESULT_DML 2
#define RESULT_DDL 3
#define RESULT_DQL 4
+#define RESULT_ASYNC 5
/* flags for move methods */
#define QUERY_MOVEFIRST 1
@@ -172,6 +173,7 @@
typedef struct
{
PyObject_HEAD
+ pgobject *pgcnx; /* for async results */
PGresult *result; /* result content */
int result_type; /* type of previous result */
long current_pos; /* current position in last result */
@@ -1685,7 +1687,7 @@
/* connects to a database */
static char connect__doc__[] =
-"connect(dbname, host, port, opt, tty) -- connect to a PostgreSQL database "
+"connect(dbname, host, port, opt, tty, user, password, nowait) -- connect to a
PostgreSQL database "
"using specified parameters (optionals, keywords aware).";
static PyObject *
@@ -1692,7 +1694,7 @@
pgconnect(pgobject *self, PyObject *args, PyObject *dict)
{
static const char *kwlist[] = {"dbname", "host", "port", "opt",
- "tty", "user", "passwd", NULL};
+ "tty", "user", "passwd", "nowait", NULL};
char *pghost,
*pgopt,
@@ -1700,8 +1702,10 @@
*pgdbname,
*pguser,
*pgpasswd;
- int pgport;
+ int pgport, nowait = 0, kwcount = 0;
char port_buffer[20];
+ const char *keywords[sizeof(kwlist)/sizeof(*kwlist)+1],
+ *values[sizeof(kwlist)/sizeof(*kwlist)+1];
pgobject *npgobj;
pghost = pgopt = pgtty = pgdbname = pguser = pgpasswd = NULL;
@@ -1713,8 +1717,8 @@
* don't declare kwlist as const char *kwlist[] then it complains when
* I try to assign all those constant strings to it.
*/
- if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzz", (char **)
kwlist,
- &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser,
&pgpasswd))
+ if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzzi", (char **)
kwlist,
+ &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser,
&pgpasswd, &nowait))
return NULL;
#ifdef DEFAULT_VARS
@@ -1753,8 +1757,59 @@
#ifdef PQsetdbLoginIsThreadSafe
Py_BEGIN_ALLOW_THREADS
#endif
- npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
- pgopt, pgtty, pgdbname, pguser, pgpasswd);
+ if (!nowait)
+ {
+ npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL :
port_buffer,
+ pgopt, pgtty, pgdbname, pguser, pgpasswd);
+ }
+ else
+ {
+ if (pghost)
+ {
+ keywords[kwcount] = "host";
+ values[kwcount] = pghost;
+ kwcount++;
+ }
+ if (pgopt)
+ {
+ keywords[kwcount] = "options";
+ values[kwcount] = pgopt;
+ kwcount++;
+ }
+ if (pgtty)
+ {
+ /* postgres ignores this, so we do, too */
+ }
+ if (pgdbname)
+ {
+ keywords[kwcount] = "dbname";
+ values[kwcount] = pgdbname;
+ kwcount++;
+ }
+ if (pguser)
+ {
+ keywords[kwcount] = "user";
+ values[kwcount] = pguser;
+ kwcount++;
+ }
+ if (pgpasswd)
+ {
+ keywords[kwcount] = "password";
+ values[kwcount] = pgpasswd;
+ kwcount++;
+ }
+ if (pgport != -1)
+ {
+ keywords[kwcount] = "port";
+ values[kwcount] = port_buffer;
+ kwcount++;
+ }
+ keywords[kwcount] = values[kwcount] = NULL;
+
+ /* The final argument means dbname can be a full connection
string,
+ * which is always the case for PQsetdbLogin() */
+ npgobj->cnx = PQconnectStartParams(keywords, values, 1);
+ }
#ifdef PQsetdbLoginIsThreadSafe
Py_END_ALLOW_THREADS
#endif
@@ -1854,6 +1909,9 @@
if (self->result)
PQclear(self->result);
+ if (self->pgcnx)
+ Py_DECREF(self->pgcnx);
+
PyObject_Del(self);
}
@@ -1939,6 +1997,44 @@
#endif
}
+/* get asynchronous connection state */
+static char pg_connectpoll__doc__[] =
+"Initiates phases of the asynchronous connection process"
+" and returns the connection state.";
+
+static PyObject *
+pg_connectpoll(pgobject *self, PyObject *args)
+{
+ int rc;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* checks args */
+ if (!PyArg_ParseTuple(args, ""))
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "method connectPoll() takes no parameters.");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ rc =PQconnectPoll(self->cnx);
+ Py_END_ALLOW_THREADS
+
+ if (rc == PGRES_POLLING_FAILED)
+ {
+ set_dberror(InternalError, PQerrorMessage(self->cnx), NULL);
+ Py_XDECREF(self);
+ return NULL;
+ }
+
+ return PyInt_FromLong(rc);
+}
+
/* set notice receiver callback function */
static char pg_set_notice_receiver__doc__[] =
"set_notice_receiver() -- set the current notice receiver.";
@@ -2098,6 +2194,56 @@
return PyInt_FromLong(num);
}
+
+/* for non-query results, sets the appropriate error status, returns
+ * the appropriate value, and frees the result set */
+static PyObject * _check_result_status(int status, PGconn * cnx, PGresult *
result)
+{
+ switch (status)
+ {
+ case PGRES_EMPTY_QUERY:
+ PyErr_SetString(PyExc_ValueError, "empty query.");
+ break;
+ case PGRES_BAD_RESPONSE:
+ case PGRES_FATAL_ERROR:
+ case PGRES_NONFATAL_ERROR:
+ set_dberror(ProgrammingError,
+ PQerrorMessage(cnx), result);
+ break;
+ case PGRES_COMMAND_OK:
+ { /*
INSERT, UPDATE, DELETE */
+ Oid oid = PQoidValue(result);
+ if (oid == InvalidOid) /* not a single insert
*/
+ {
+ char *ret = PQcmdTuples(result);
+ PQclear(result);
+ if (ret[0]) /* return
number of rows affected */
+ {
+ return PyString_FromString(ret);
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+ /* for a single insert, return the oid */
+ PQclear(result);
+ return PyInt_FromLong(oid);
+ }
+ case PGRES_COPY_OUT: /* no data will be received */
+ case PGRES_COPY_IN:
+ PQclear(result);
+ Py_INCREF(Py_None);
+ return Py_None;
+ default:
+ set_dberror(InternalError,
+ "internal error: unknown result status.",
result);
+ break;
+ }
+
+ PQclear(result);
+ return NULL; /* error detected on query */
+}
+
+
/* retrieves last result */
static char pgquery_getresult__doc__[] =
"getresult() -- Gets the result of a query. The result is returned "
@@ -2124,13 +2270,81 @@
return NULL;
}
+ /* this is following an async call, so need to get the result first */
+ if (self->result_type == RESULT_ASYNC)
+ {
+ int status;
+
+ if (!self->pgcnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not
valid.");
+ return NULL;
+ }
+
+
+ Py_BEGIN_ALLOW_THREADS
+ if (self->result)
+ {
+ PQclear(self->result);
+ }
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS
+ /* end of result set, return None */
+ if (!self->result)
+ {
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+ {
+ val = _check_result_status(status, self->pgcnx->cnx,
self->result);
+ /* _check_result_status calls PQclear() so we need to
clear this
+ * attribute */
+ self->result = NULL;
+ /* throwing an exception. Need to call PQgetResult() to
clear the
+ * connection state. This should return NULL the first
time. */
+ if (!val)
+ {
+ self->result = PQgetResult(self->pgcnx->cnx);
+ while (self->result)
+ {
+ PQclear(self->result);
+ self->result =
PQgetResult(self->pgcnx->cnx);
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ }
+ }
+ /* it's confusing to return None here because the
caller has to
+ * call again until we return None. We can't just
consume that
+ * final None because we don't know if there are
additional
+ * statements following this one, so we return an empty
string where
+ * query() would return None */
+ else if (val == Py_None)
+ {
+ Py_DECREF(val);
+ val = PyString_FromString("");
+ }
+ return val;
+ }
+ }
+
+
/* stores result in tuple */
m = PQntuples(self->result);
n = PQnfields(self->result);
- reslist = PyList_New(m);
typ = get_type_array(self->result, n);
+ if (!typ)
+ {
+ return NULL;
+ }
+
+ reslist = PyList_New(m);
+
for (i = 0; i < m; i++)
{
if (!(rowtuple = PyTuple_New(n)))
@@ -2251,6 +2465,59 @@
return NULL;
}
+ /* this is following an async call, so need to get the result first */
+ if (self->result_type == RESULT_ASYNC)
+ {
+ int status;
+
+ Py_BEGIN_ALLOW_THREADS
+ if (self->result)
+ {
+ PQclear(self->result);
+ }
+ self->result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS
+ /* end of result set, return None */
+ if (!self->result)
+ {
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+ if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+ {
+ val = _check_result_status(status, self->pgcnx->cnx,
self->result);
+ /* _check_result_status calls PQclear() so we need to
clear this
+ * attribute */
+ self->result = NULL;
+ /* throwing an exception. Need to call PQgetResult() to
clear the
+ * connection state. This should return NULL the first
time. */
+ if (!val)
+ {
+ self->result = PQgetResult(self->pgcnx->cnx);
+ while (self->result)
+ {
+ PQclear(self->result);
+ self->result =
PQgetResult(self->pgcnx->cnx);
+ Py_DECREF(self->pgcnx);
+ self->pgcnx = NULL;
+ }
+ }
+ /* it's confusing to return None here because the
caller has to
+ * call again until we return None. We can't just
consume that
+ * final None because we don't know if there are
additional
+ * statements following this one, so we return an empty
string where
+ * query() would return None */
+ else if (val == Py_None)
+ {
+ Py_DECREF(val);
+ val = PyString_FromString("");
+ }
+ return val;
+ }
+ }
+
+
/* stores result in list */
m = PQntuples(self->result);
n = PQnfields(self->result);
@@ -2484,8 +2751,13 @@
"query(sql, [args]) -- creates a new query object for this connection, using"
" sql (string) request and optionally a tuple with positional parameters.";
+static char pg_sendquery__doc__[] =
+"sendquery(sql, [args]) -- creates a new query object for this connection,
using"
+" sql (string) request and optionally a tuple with positional parameters.
Returns"
+" without waiting for the query to complete.";
+
static PyObject *
-pg_query(pgobject *self, PyObject *args)
+_pg_query(pgobject *self, PyObject *args, int async)
{
char *query;
PyObject *oargs = NULL;
@@ -2608,8 +2880,17 @@
}
Py_BEGIN_ALLOW_THREADS
- result = PQexecParams(self->cnx, query, nparms,
- NULL, (const char * const *)parms, lparms, NULL, 0);
+ if (async)
+ {
+ status = PQsendQueryParams(self->cnx, query, nparms,
+ NULL, (const char * const *)parms, lparms,
NULL, 0);
+ result = NULL;
+ }
+ else
+ {
+ result = PQexecParams(self->cnx, query, nparms,
+ NULL, (const char * const *)parms, lparms,
NULL, 0);
+ }
Py_END_ALLOW_THREADS
free(lparms); free(parms);
@@ -2625,12 +2906,20 @@
else
{
Py_BEGIN_ALLOW_THREADS
- result = PQexec(self->cnx, query);
+ if (async)
+ {
+ status = PQsendQuery(self->cnx, query);
+ result = NULL;
+ }
+ else
+ {
+ result = PQexec(self->cnx, query);
+ }
Py_END_ALLOW_THREADS
}
/* checks result validity */
- if (!result)
+ if ((!async && !result) || (async && !status))
{
PyErr_SetString(PyExc_ValueError, PQerrorMessage(self->cnx));
return NULL;
@@ -2637,51 +2926,9 @@
}
/* checks result status */
- if ((status = PQresultStatus(result)) != PGRES_TUPLES_OK)
+ if (!async && (status = PQresultStatus(result)) != PGRES_TUPLES_OK)
{
- switch (status)
- {
- case PGRES_EMPTY_QUERY:
- PyErr_SetString(PyExc_ValueError, "empty
query.");
- break;
- case PGRES_BAD_RESPONSE:
- case PGRES_FATAL_ERROR:
- case PGRES_NONFATAL_ERROR:
- set_dberror(ProgrammingError,
- PQerrorMessage(self->cnx), result);
- break;
- case PGRES_COMMAND_OK:
- {
/* INSERT, UPDATE, DELETE */
- Oid oid =
PQoidValue(result);
- if (oid == InvalidOid) /* not a single
insert */
- {
- char *ret =
PQcmdTuples(result);
-
- PQclear(result);
- if (ret[0]) /*
return number of rows affected */
- {
- return
PyString_FromString(ret);
- }
- Py_INCREF(Py_None);
- return Py_None;
- }
- /* for a single insert, return the oid
*/
- PQclear(result);
- return PyInt_FromLong(oid);
- }
- case PGRES_COPY_OUT: /* no data will be
received */
- case PGRES_COPY_IN:
- PQclear(result);
- Py_INCREF(Py_None);
- return Py_None;
- default:
- set_dberror(InternalError,
- "internal error: unknown result
status.", result);
- break;
- }
-
- PQclear(result);
- return NULL; /* error detected on query */
+ return _check_result_status(status, self->cnx, result);
}
if (!(npgobj = PyObject_NEW(pgqueryobject, &PgQueryType)))
@@ -2688,10 +2935,29 @@
return NULL;
/* stores result and returns object */
+ npgobj->pgcnx = NULL;
npgobj->result = result;
+ if (async)
+ {
+ npgobj->result_type = RESULT_ASYNC;
+ Py_INCREF(self);
+ npgobj->pgcnx = self;
+ }
return (PyObject *) npgobj;
}
+static PyObject *
+pg_query(pgobject *self, PyObject *args, int nowait)
+{
+ return _pg_query(self, args, 0);
+}
+
+static PyObject *
+pg_sendquery(pgobject *self, PyObject *args, int nowait )
+{
+ return _pg_query(self, args, 1);
+}
+
#ifdef DIRECT_ACCESS
static char pg_putline__doc__[] =
"putline() -- sends a line directly to the backend";
@@ -2798,6 +3064,74 @@
Py_INCREF(Py_None);
return Py_None;
}
+
+
+/* direct access function : setnonblocking */
+static char pg_setnonblocking__doc__[] =
+"setnonblocking() -- puts direct copy functions in non-blocking mode";
+
+static PyObject *
+pg_setnonblocking(pgobject *self, PyObject *args)
+{
+ int nonblocking;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* reads args */
+ if (!PyArg_ParseTuple(args, "i", &nonblocking))
+ {
+ PyErr_SetString(PyExc_TypeError, "setnonblocking(tf), with
boolean.");
+ return NULL;
+ }
+
+ /* returns -1 on error, or 0 on success */
+ if (PQsetnonblocking(self->cnx, nonblocking) < 0)
+ {
+ PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+ return NULL;
+ }
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+/* direct access function : isnonblocking */
+static char pg_isnonblocking__doc__[] =
+"isnonblocking() -- reports the non-blocking status of copy functions";
+
+static PyObject *
+pg_isnonblocking(pgobject *self, PyObject *args)
+{
+ int rc;
+
+ if (!self->cnx)
+ {
+ PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+ return NULL;
+ }
+
+ /* reads args */
+ if (!PyArg_ParseTuple(args, ""))
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "method isnonblocking() takes no parameters.");
+ return NULL;
+ }
+
+ /* returns 1 if blocking, 0 otherwise. not sure if it can return -1 */
+ rc = PQisnonblocking(self->cnx);
+ if (rc < 0)
+ {
+ PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+ return NULL;
+ }
+
+ return PyBool_FromLong(rc);
+}
+
#endif /* DIRECT_ACCESS */
static PyObject *
@@ -3301,9 +3635,12 @@
static struct PyMethodDef pgobj_methods[] = {
{"source", (PyCFunction) pg_source, METH_VARARGS, pg_source__doc__},
{"query", (PyCFunction) pg_query, METH_VARARGS, pg_query__doc__},
+ {"sendquery", (PyCFunction) pg_sendquery, METH_VARARGS,
pg_sendquery__doc__},
{"reset", (PyCFunction) pg_reset, METH_VARARGS, pg_reset__doc__},
{"cancel", (PyCFunction) pg_cancel, METH_VARARGS, pg_cancel__doc__},
{"close", (PyCFunction) pg_close, METH_VARARGS, pg_close__doc__},
+ {"connectpoll", (PyCFunction) pg_connectpoll, METH_VARARGS,
+ pg_connectpoll__doc__},
{"fileno", (PyCFunction) pg_fileno, METH_VARARGS, pg_fileno__doc__},
{"get_notice_receiver", (PyCFunction) pg_get_notice_receiver,
METH_VARARGS,
pg_get_notice_receiver__doc__},
@@ -3333,6 +3670,8 @@
{"putline", (PyCFunction) pg_putline, 1, pg_putline__doc__},
{"getline", (PyCFunction) pg_getline, 1, pg_getline__doc__},
{"endcopy", (PyCFunction) pg_endcopy, 1, pg_endcopy__doc__},
+ {"setnonblocking", (PyCFunction) pg_setnonblocking, 1,
pg_setnonblocking__doc__},
+ {"isnonblocking", (PyCFunction) pg_isnonblocking, 1,
pg_isnonblocking__doc__},
#endif /* DIRECT_ACCESS */
#ifdef LARGE_OBJECTS
@@ -4249,6 +4588,12 @@
PyDict_SetItemString(dict, "SEEK_END", PyInt_FromLong(SEEK_END));
#endif /* LARGE_OBJECTS */
+ /* PQconnectPoll() results */
+
PyDict_SetItemString(dict,"PGRES_POLLING_OK",PyInt_FromLong(PGRES_POLLING_OK));
+
PyDict_SetItemString(dict,"PGRES_POLLING_FAILED",PyInt_FromLong(PGRES_POLLING_FAILED));
+
PyDict_SetItemString(dict,"PGRES_POLLING_READING",PyInt_FromLong(PGRES_POLLING_READING));
+
PyDict_SetItemString(dict,"PGRES_POLLING_WRITING",PyInt_FromLong(PGRES_POLLING_WRITING));
+
#ifdef DEFAULT_VARS
/* prepares default values */
Py_INCREF(Py_None);
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql