I sent this previously as a single patch, but it's been stuck waiting
for moderation for a month, so I thought I'd try again with two smaller
patches.

The attached patch, along with the one in my previous message, provides
the asynchronous operations described in section 31.4 of the postgres
manual. I've described the changes in the previous message. This message
contains the changes to pgmodule.c.

-- 
Patrick TJ McPhee <[email protected]>

Index: trunk/module/pgmodule.c
===================================================================
--- trunk/module/pgmodule.c	(revision 520)
+++ trunk/module/pgmodule.c	(working copy)
@@ -74,6 +74,7 @@
 #define RESULT_DML			2
 #define RESULT_DDL			3
 #define RESULT_DQL			4
+#define RESULT_ASYNC		5
 
 /* flags for move methods */
 #define QUERY_MOVEFIRST		1
@@ -172,6 +173,7 @@
 typedef struct
 {
 	PyObject_HEAD
+	pgobject	*pgcnx;			/* for async results */
 	PGresult	*result;		/* result content */
 	int			result_type;	/* type of previous result */
 	long		current_pos;	/* current position in last result */
@@ -1685,7 +1687,7 @@
 
 /* connects to a database */
 static char connect__doc__[] =
-"connect(dbname, host, port, opt, tty) -- connect to a PostgreSQL database "
+"connect(dbname, host, port, opt, tty, user, password, nowait) -- connect to a PostgreSQL database "
 "using specified parameters (optionals, keywords aware).";
 
 static PyObject *
@@ -1692,7 +1694,7 @@
 pgconnect(pgobject *self, PyObject *args, PyObject *dict)
 {
 	static const char *kwlist[] = {"dbname", "host", "port", "opt",
-	"tty", "user", "passwd", NULL};
+       "tty", "user", "passwd", "nowait", NULL};
 
 	char	   *pghost,
 			   *pgopt,
@@ -1700,8 +1702,10 @@
 			   *pgdbname,
 			   *pguser,
 			   *pgpasswd;
-	int			pgport;
+	int			pgport, nowait = 0, kwcount = 0;
 	char		port_buffer[20];
+	const char *keywords[sizeof(kwlist)/sizeof(*kwlist)+1],
+			   *values[sizeof(kwlist)/sizeof(*kwlist)+1];
 	pgobject   *npgobj;
 
 	pghost = pgopt = pgtty = pgdbname = pguser = pgpasswd = NULL;
@@ -1713,8 +1717,8 @@
 	 * don't declare kwlist as const char *kwlist[] then it complains when
 	 * I try to assign all those constant strings to it.
 	 */
-	if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzz", (char **) kwlist,
-		&pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, &pgpasswd))
+	if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzizzzzi", (char **) kwlist,
+		&pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, &pgpasswd, &nowait))
 		return NULL;
 
 #ifdef DEFAULT_VARS
@@ -1753,8 +1757,59 @@
 #ifdef PQsetdbLoginIsThreadSafe
 	Py_BEGIN_ALLOW_THREADS
 #endif
-	npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
-		pgopt, pgtty, pgdbname, pguser, pgpasswd);
+	if (!nowait)
+	{
+		npgobj->cnx = PQsetdbLogin(pghost, pgport == -1 ? NULL : port_buffer,
+			pgopt, pgtty, pgdbname, pguser, pgpasswd);
+	}
+	else
+	{
+		if (pghost)
+		{
+			keywords[kwcount] = "host";
+			values[kwcount] = pghost;
+			kwcount++;
+		}
+		if (pgopt)
+		{
+			keywords[kwcount] = "options";
+			values[kwcount] = pgopt;
+			kwcount++;
+		}
+		if (pgtty)
+		{
+			/* postgres ignores this, so we do, too */
+		}
+		if (pgdbname)
+		{
+			keywords[kwcount] = "dbname";
+			values[kwcount] = pgdbname;
+			kwcount++;
+		}
+		if (pguser)
+		{
+			keywords[kwcount] = "user";
+			values[kwcount] = pguser;
+			kwcount++;
+		}
+		if (pgpasswd)
+		{
+			keywords[kwcount] = "password";
+			values[kwcount] = pgpasswd;
+			kwcount++;
+		}
+		if (pgport != -1)
+		{
+			keywords[kwcount] = "port";
+			values[kwcount] = port_buffer;
+			kwcount++;
+		}
+		keywords[kwcount] = values[kwcount] = NULL;
+
+		/* The final argument means dbname can be a full connection string,
+		 * which is always the case for PQsetdbLogin() */
+		npgobj->cnx = PQconnectStartParams(keywords, values, 1);
+	}
 #ifdef PQsetdbLoginIsThreadSafe
 	Py_END_ALLOW_THREADS
 #endif
@@ -1854,6 +1909,9 @@
 	if (self->result)
 		PQclear(self->result);
 
+	if (self->pgcnx)
+		Py_DECREF(self->pgcnx);
+
 	PyObject_Del(self);
 }
 
@@ -1939,6 +1997,44 @@
 #endif
 }
 
+/* get asynchronous connection state */
+static char pg_connectpoll__doc__[] =
+"Initiates phases of the asynchronous connection process"
+" and returns the connection state.";
+
+static PyObject *
+pg_connectpoll(pgobject *self, PyObject *args)
+{
+	int rc;
+
+	if (!self->cnx)
+	{
+		PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+		return NULL;
+	}
+
+	/* checks args */
+	if (!PyArg_ParseTuple(args, ""))
+	{
+		PyErr_SetString(PyExc_TypeError,
+			"method connectPoll() takes no parameters.");
+		return NULL;
+	}
+
+	Py_BEGIN_ALLOW_THREADS
+	rc =PQconnectPoll(self->cnx);
+	Py_END_ALLOW_THREADS
+
+	if (rc == PGRES_POLLING_FAILED)
+	{
+		set_dberror(InternalError, PQerrorMessage(self->cnx), NULL);
+		Py_XDECREF(self);
+		return NULL;
+	}
+
+	return PyInt_FromLong(rc);
+}
+
 /* set notice receiver callback function */
 static char pg_set_notice_receiver__doc__[] =
 "set_notice_receiver() -- set the current notice receiver.";
@@ -2098,6 +2194,56 @@
 	return PyInt_FromLong(num);
 }
 
+
+/* for non-query results, sets the appropriate error status, returns
+ * the appropriate value, and frees the result set */
+static PyObject * _check_result_status(int status, PGconn * cnx, PGresult * result)
+{
+	switch (status)
+	{
+		case PGRES_EMPTY_QUERY:
+			PyErr_SetString(PyExc_ValueError, "empty query.");
+			break;
+		case PGRES_BAD_RESPONSE:
+		case PGRES_FATAL_ERROR:
+		case PGRES_NONFATAL_ERROR:
+			set_dberror(ProgrammingError,
+				PQerrorMessage(cnx), result);
+			break;
+		case PGRES_COMMAND_OK:
+			{						/* INSERT, UPDATE, DELETE */
+				Oid		oid = PQoidValue(result);
+				if (oid == InvalidOid)	/* not a single insert */
+				{
+					char	*ret = PQcmdTuples(result);
+						PQclear(result);
+					if (ret[0])		/* return number of rows affected */
+					{
+						return PyString_FromString(ret);
+					}
+					Py_INCREF(Py_None);
+					return Py_None;
+				}
+				/* for a single insert, return the oid */
+				PQclear(result);
+				return PyInt_FromLong(oid);
+			}
+		case PGRES_COPY_OUT:		/* no data will be received */
+		case PGRES_COPY_IN:
+			PQclear(result);
+			Py_INCREF(Py_None);
+			return Py_None;
+		default:
+			set_dberror(InternalError,
+				"internal error: unknown result status.", result);
+			break;
+	}
+
+	PQclear(result);
+	return NULL;			/* error detected on query */
+}
+
+
 /* retrieves last result */
 static char pgquery_getresult__doc__[] =
 "getresult() -- Gets the result of a query.  The result is returned "
@@ -2124,13 +2270,81 @@
 		return NULL;
 	}
 
+	/* this is following an async call, so need to get the result first */
+	if (self->result_type == RESULT_ASYNC)
+	{
+		int status;
+
+		if (!self->pgcnx)
+		{
+			PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+			return NULL;
+		}
+
+
+		Py_BEGIN_ALLOW_THREADS
+		if (self->result)
+		{
+			PQclear(self->result);
+		}
+		self->result = PQgetResult(self->pgcnx->cnx);
+		Py_END_ALLOW_THREADS
+		/* end of result set, return None */
+		if (!self->result)
+		{
+			Py_DECREF(self->pgcnx);
+			self->pgcnx = NULL;
+			Py_INCREF(Py_None);
+			return Py_None;
+		}
+
+		if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+		{
+			val = _check_result_status(status, self->pgcnx->cnx, self->result);
+			/* _check_result_status calls PQclear() so we need to clear this
+			 * attribute */
+			self->result = NULL;
+			/* throwing an exception. Need to call PQgetResult() to clear the
+			 * connection state. This should return NULL the first time. */
+			if (!val)
+			{
+				self->result = PQgetResult(self->pgcnx->cnx);
+				while (self->result)
+				{
+					PQclear(self->result);
+					self->result = PQgetResult(self->pgcnx->cnx);
+					Py_DECREF(self->pgcnx);
+					self->pgcnx = NULL;
+				}
+			}
+			/* it's confusing to return None here because the caller has to
+			 * call again until we return None. We can't just consume that
+			 * final None because we don't know if there are additional
+			 * statements following this one, so we return an empty string where
+			 * query() would return None */
+			else if (val == Py_None)
+			{
+				Py_DECREF(val);
+				val = PyString_FromString("");
+			}
+			return val;
+		}
+	}
+
+
 	/* stores result in tuple */
 	m = PQntuples(self->result);
 	n = PQnfields(self->result);
-	reslist = PyList_New(m);
 
 	typ = get_type_array(self->result, n);
 
+	if (!typ)
+	{
+		return NULL;
+	}
+
+	reslist = PyList_New(m);
+
 	for (i = 0; i < m; i++)
 	{
 		if (!(rowtuple = PyTuple_New(n)))
@@ -2251,6 +2465,59 @@
 		return NULL;
 	}
 
+	/* this is following an async call, so need to get the result first */
+	if (self->result_type == RESULT_ASYNC)
+	{
+		int status;
+
+		Py_BEGIN_ALLOW_THREADS
+		if (self->result)
+		{
+			PQclear(self->result);
+		}
+		self->result = PQgetResult(self->pgcnx->cnx);
+		Py_END_ALLOW_THREADS
+		/* end of result set, return None */
+		if (!self->result)
+		{
+			Py_INCREF(Py_None);
+			return Py_None;
+		}
+
+		if ((status = PQresultStatus(self->result)) != PGRES_TUPLES_OK)
+		{
+			val = _check_result_status(status, self->pgcnx->cnx, self->result);
+			/* _check_result_status calls PQclear() so we need to clear this
+			 * attribute */
+			self->result = NULL;
+			/* throwing an exception. Need to call PQgetResult() to clear the
+			 * connection state. This should return NULL the first time. */
+			if (!val)
+			{
+				self->result = PQgetResult(self->pgcnx->cnx);
+				while (self->result)
+				{
+					PQclear(self->result);
+					self->result = PQgetResult(self->pgcnx->cnx);
+					Py_DECREF(self->pgcnx);
+					self->pgcnx = NULL;
+				}
+			}
+			/* it's confusing to return None here because the caller has to
+			 * call again until we return None. We can't just consume that
+			 * final None because we don't know if there are additional
+			 * statements following this one, so we return an empty string where
+			 * query() would return None */
+			else if (val == Py_None)
+			{
+				Py_DECREF(val);
+				val = PyString_FromString("");
+			}
+			return val;
+		}
+	}
+
+
 	/* stores result in list */
 	m = PQntuples(self->result);
 	n = PQnfields(self->result);
@@ -2484,8 +2751,13 @@
 "query(sql, [args]) -- creates a new query object for this connection, using"
 " sql (string) request and optionally a tuple with positional parameters.";
 
+static char pg_sendquery__doc__[] =
+"sendquery(sql, [args]) -- creates a new query object for this connection, using"
+" sql (string) request and optionally a tuple with positional parameters. Returns"
+" without waiting for the query to complete.";
+
 static PyObject *
-pg_query(pgobject *self, PyObject *args)
+_pg_query(pgobject *self, PyObject *args, int async)
 {
 	char		*query;
 	PyObject	*oargs = NULL;
@@ -2608,8 +2880,17 @@
 		}
 
 		Py_BEGIN_ALLOW_THREADS
-		result = PQexecParams(self->cnx, query, nparms,
-			NULL, (const char * const *)parms, lparms, NULL, 0);
+		if (async)
+		{
+			status = PQsendQueryParams(self->cnx, query, nparms,
+				NULL, (const char * const *)parms, lparms, NULL, 0);
+			result = NULL;
+		}
+		else
+		{
+			result = PQexecParams(self->cnx, query, nparms,
+				NULL, (const char * const *)parms, lparms, NULL, 0);
+		}
 		Py_END_ALLOW_THREADS
 
 		free(lparms); free(parms);
@@ -2625,12 +2906,20 @@
 	else
 	{
 		Py_BEGIN_ALLOW_THREADS
-		result = PQexec(self->cnx, query);
+		if (async)
+		{
+			status = PQsendQuery(self->cnx, query);
+			result = NULL;
+		}
+		else
+		{
+			result = PQexec(self->cnx, query);
+		}
 		Py_END_ALLOW_THREADS
 	}
 
 	/* checks result validity */
-	if (!result)
+	if ((!async && !result) || (async && !status))
 	{
 		PyErr_SetString(PyExc_ValueError, PQerrorMessage(self->cnx));
 		return NULL;
@@ -2637,51 +2926,9 @@
 	}
 
 	/* checks result status */
-	if ((status = PQresultStatus(result)) != PGRES_TUPLES_OK)
+	if (!async && (status = PQresultStatus(result)) != PGRES_TUPLES_OK)
 	{
-		switch (status)
-		{
-			case PGRES_EMPTY_QUERY:
-				PyErr_SetString(PyExc_ValueError, "empty query.");
-				break;
-			case PGRES_BAD_RESPONSE:
-			case PGRES_FATAL_ERROR:
-			case PGRES_NONFATAL_ERROR:
-				set_dberror(ProgrammingError,
-					PQerrorMessage(self->cnx), result);
-				break;
-			case PGRES_COMMAND_OK:
-				{						/* INSERT, UPDATE, DELETE */
-					Oid		oid = PQoidValue(result);
-					if (oid == InvalidOid)	/* not a single insert */
-					{
-						char	*ret = PQcmdTuples(result);
-
-						PQclear(result);
-						if (ret[0])		/* return number of rows affected */
-						{
-							return PyString_FromString(ret);
-						}
-						Py_INCREF(Py_None);
-						return Py_None;
-					}
-					/* for a single insert, return the oid */
-					PQclear(result);
-					return PyInt_FromLong(oid);
-				}
-			case PGRES_COPY_OUT:		/* no data will be received */
-			case PGRES_COPY_IN:
-				PQclear(result);
-				Py_INCREF(Py_None);
-				return Py_None;
-			default:
-				set_dberror(InternalError,
-					"internal error: unknown result status.", result);
-				break;
-		}
-
-		PQclear(result);
-		return NULL;			/* error detected on query */
+		return _check_result_status(status, self->cnx, result);
 	}
 
 	if (!(npgobj = PyObject_NEW(pgqueryobject, &PgQueryType)))
@@ -2688,10 +2935,29 @@
 		return NULL;
 
 	/* stores result and returns object */
+	npgobj->pgcnx = NULL;
 	npgobj->result = result;
+	if (async)
+	{
+		npgobj->result_type = RESULT_ASYNC;
+		Py_INCREF(self);
+		npgobj->pgcnx = self;
+	}
 	return (PyObject *) npgobj;
 }
 
+static PyObject *
+pg_query(pgobject *self, PyObject *args, int nowait)
+{
+	return _pg_query(self, args, 0);
+}
+
+static PyObject *
+pg_sendquery(pgobject *self, PyObject *args, int nowait )
+{
+	return _pg_query(self, args, 1);
+}
+
 #ifdef DIRECT_ACCESS
 static char pg_putline__doc__[] =
 "putline() -- sends a line directly to the backend";
@@ -2798,6 +3064,74 @@
 	Py_INCREF(Py_None);
 	return Py_None;
 }
+
+
+/* direct access function : setnonblocking */
+static char pg_setnonblocking__doc__[] =
+"setnonblocking() -- puts direct copy functions in non-blocking mode";
+
+static PyObject *
+pg_setnonblocking(pgobject *self, PyObject *args)
+{
+	int nonblocking;
+
+	if (!self->cnx)
+	{
+		PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+		return NULL;
+	}
+
+	/* reads args */
+	if (!PyArg_ParseTuple(args, "i", &nonblocking))
+	{
+		PyErr_SetString(PyExc_TypeError, "setnonblocking(tf), with boolean.");
+		return NULL;
+	}
+
+	/* returns -1 on error, or 0 on success */
+	if (PQsetnonblocking(self->cnx, nonblocking) < 0)
+	{
+		PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+		return NULL;
+	}
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
+/* direct access function : isnonblocking */
+static char pg_isnonblocking__doc__[] =
+"isnonblocking() -- reports the non-blocking status of copy functions";
+
+static PyObject *
+pg_isnonblocking(pgobject *self, PyObject *args)
+{
+	int rc;
+
+	if (!self->cnx)
+	{
+		PyErr_SetString(PyExc_TypeError, "Connection is not valid.");
+		return NULL;
+	}
+
+	/* reads args */
+	if (!PyArg_ParseTuple(args, ""))
+	{
+		PyErr_SetString(PyExc_TypeError,
+			"method isnonblocking() takes no parameters.");
+		return NULL;
+	}
+
+	/* returns 1 if blocking, 0 otherwise. not sure if it can return -1 */
+	rc = PQisnonblocking(self->cnx);
+	if (rc < 0)
+	{
+		PyErr_SetString(PyExc_IOError, PQerrorMessage(self->cnx));
+		return NULL;
+	}
+
+	return PyBool_FromLong(rc);
+}
+
 #endif /* DIRECT_ACCESS */
 
 static PyObject *
@@ -3301,9 +3635,12 @@
 static struct PyMethodDef pgobj_methods[] = {
 	{"source", (PyCFunction) pg_source, METH_VARARGS, pg_source__doc__},
 	{"query", (PyCFunction) pg_query, METH_VARARGS, pg_query__doc__},
+	{"sendquery", (PyCFunction) pg_sendquery, METH_VARARGS, pg_sendquery__doc__},
 	{"reset", (PyCFunction) pg_reset, METH_VARARGS, pg_reset__doc__},
 	{"cancel", (PyCFunction) pg_cancel, METH_VARARGS, pg_cancel__doc__},
 	{"close", (PyCFunction) pg_close, METH_VARARGS, pg_close__doc__},
+	{"connectpoll", (PyCFunction) pg_connectpoll, METH_VARARGS,
+			pg_connectpoll__doc__},
 	{"fileno", (PyCFunction) pg_fileno, METH_VARARGS, pg_fileno__doc__},
 	{"get_notice_receiver", (PyCFunction) pg_get_notice_receiver, METH_VARARGS,
 			pg_get_notice_receiver__doc__},
@@ -3333,6 +3670,8 @@
 	{"putline", (PyCFunction) pg_putline, 1, pg_putline__doc__},
 	{"getline", (PyCFunction) pg_getline, 1, pg_getline__doc__},
 	{"endcopy", (PyCFunction) pg_endcopy, 1, pg_endcopy__doc__},
+	{"setnonblocking", (PyCFunction) pg_setnonblocking, 1, pg_setnonblocking__doc__},
+	{"isnonblocking", (PyCFunction) pg_isnonblocking, 1, pg_isnonblocking__doc__},
 #endif /* DIRECT_ACCESS */
 
 #ifdef LARGE_OBJECTS
@@ -4249,6 +4588,12 @@
 	PyDict_SetItemString(dict, "SEEK_END", PyInt_FromLong(SEEK_END));
 #endif /* LARGE_OBJECTS */
 
+	/* PQconnectPoll() results */
+	PyDict_SetItemString(dict,"PGRES_POLLING_OK",PyInt_FromLong(PGRES_POLLING_OK));
+	PyDict_SetItemString(dict,"PGRES_POLLING_FAILED",PyInt_FromLong(PGRES_POLLING_FAILED));
+	PyDict_SetItemString(dict,"PGRES_POLLING_READING",PyInt_FromLong(PGRES_POLLING_READING));
+	PyDict_SetItemString(dict,"PGRES_POLLING_WRITING",PyInt_FromLong(PGRES_POLLING_WRITING));
+
 #ifdef DEFAULT_VARS
 	/* prepares default values */
 	Py_INCREF(Py_None);
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to