There's a Gary Larson cartoon which has a bunch of humans standing
around a space ship, and one of the aliens has obviously just fallen
down the stairs. At the top, another alien says "well, so much for
inspiring them with a sense of awe". I keep falling down the stairs when
sending this patch to the list.
I described this patch in a message Monday, in which I spelled
asynchronous incorrectly, but the attachment was scrubbed I hope because
of the mime type. The next message will have the changes to the C
module.
--
Patrick TJ McPhee <[email protected]>
Index: trunk/docs/pg.txt
===================================================================
--- trunk/docs/pg.txt (revision 520)
+++ trunk/docs/pg.txt (working copy)
@@ -59,7 +59,7 @@
-------------------------------
Syntax::
- connect([dbname], [host], [port], [opt], [tty], [user], [passwd])
+ connect([dbname], [host], [port], [opt], [tty], [user], [passwd], [nowait])
Parameters:
:dbname: name of connected database (string/None)
@@ -69,6 +69,7 @@
:tty: debug terminal (string/None)
:user: PostgreSQL user (string/None)
:passwd: password for user (string/None)
+ :nowait: True to perform the connection asynchronously
Return type:
:pgobject: If successful, the `pgobject` handling the connection
@@ -86,6 +87,7 @@
Python tutorial. The names of the keywords are the name of the
parameters given in the syntax line. For a precise description
of the parameters, please refer to the PostgreSQL user manual.
+ See connectpoll() for a description of the nowait parameter.
Examples::
@@ -480,6 +482,95 @@
phone = con.query("select phone from employees"
" where name=$1", (name, )).getresult()
+sendquery - executes a SQL command string asynchronously
+--------------------------------------------------------
+Syntax::
+
+ sendquery(command, [args])
+
+Parameters:
+ :command: SQL command (string)
+ :args: optional positional arguments
+
+Return type:
+ :pgqueryobject
+
+Exceptions raised:
+ :TypeError: bad argument type, or too many arguments
+ :TypeError: invalid connection
+
+Description:
+ `sendquery()` is much the same as `query()`, except that it returns without
+ waiting for the query to complete. The database connection cannot be used
+ for other operations until the query completes, but the application can
+ do other things, including executing queries using other database
connections.
+ The application can call `select()` using the connection's `fileno()` to
+ determine when the query has results to return.
+
+ `sendquery()` always returns a `pgqueryobject`. This object differs from
+ the `pgqueryobject` returned by `query()` in a few ways. Most importantly,
+ when `sendquery()` is used, the application must call one of the
+ result-returning methods (`getresult()`, `dictresult()`, or `namedresult()`)
+ on the `pgqueryobject` until it either throws an exception or returns `None`.
+ Otherwise, the database connection will be left in an unusable state.
+
+ In cases when `query()` would return something other than a `pgqueryobject`,
+ that result will be returned by calling one of the result-returning methods
+ on the `pgqueryobject` returned by `sendquery()`. There's one important
+ difference in these result codes: if `query()` returns `None`, the
+ result-returning methods will return an empty string (`''`). It's still
+ necessary to call a result-returning function until it returns `None`.
+
+ `listfields()`, `fieldname()`, `fieldnum()`, and `ntuples()` only work
+ after a call to a result-returning method with a non-`None` return value.
+ `ntuples()` returns only the number of rows returned by the previous
+ result-returning function.
+
+ If multiple semi-colon-delimited statements are passed to `query()`, only
+ the results of the last statement are returned in the `pgqueryobject`. With
+ `sendquery()`, all results are returned. Each result set will be
+ returned by a separate call to `getresult()`.
+
+Example::
+
+ name = raw_input("Name? ")
+ pgq = con.sendquery("select phone from employees"
+ " where name=$1", (name, ))
+ phone = pgq.getresult()
+ pgq.getresult() # to close the query
+
+ # initiate two queries in one round trip
+ # note this differs from a union since the result sets have different
+ # structures
+ pgq = con.sendquery("select a,b,c from x where d=e; select e,f from y where
g")
+ qrabc = pgq.dictresult() # results from x
+ qref = pgq.dictresult() # results from y
+ pgq.dictresult() # to close the query
+
+ # using select to wait for the query to be ready
+ pgq = con.sendquery("select pg_sleep(20)")
+ r,w,e = select([con.fileno(),other,sockets],[],[])
+ if con.fileno() in r:
+ results = pgq.getresult()
+ pgq.getresult() # to close the query
+
+ # concurrent queries on separate connections
+ con1 = connect()
+ con2 = connect()
+ ss = con1.query("begin; set transaction isolation level repeatable read;"
+ "select pg_export_snapshot();").getresult()[0][0]
+ con2.query("begin; set transaction isolation level repeatable read;"
+ "set transaction snapshot '%s'" % (ss,))
+ pgq1 = con1.sendquery("select a,b,c from x where d=e")
+ pgq2 = con2.sendquery("select e,f from y where g")
+ qr1 = pgq1.getresult()
+ pgq1.getresult()
+ qr2 = pgq2.getresult()
+ pgq2.getresult()
+ con1.query("commit")
+ con2.query("commit")
+
+
reset - resets the connection
-----------------------------
Syntax::
@@ -540,6 +631,59 @@
allows you to explicitly close it. It is mainly here to allow
the DB-SIG API wrapper to implement a close function.
+connectpoll - completes an asynchronous connection
+--------------------------------------------------
+Syntax::
+
+ connectpoll()
+
+Parameters:
+ None
+
+Return type:
+ :int: PGRES_POLLING_OK, PGRES_POLLING_FAILED, PGRES_POLLING_READING or
+ PGRES_POLLING_WRITING
+
+Exceptions raised:
+ :TypeError: too many (any) arguments
+ :TypeError: invalid connection
+ :pg.InternalError: some error occurred during pg connection
+
+Description:
+ The database connection can be performed without any blocking
+ calls. This allows the application mainline to perform other
+ operations or perhaps connect to multiple databases concurrently.
+ Once the connecton is established, it's no different from a connection
+ made using blocking calls.
+
+ The required steps are to pass the parameter "nowait=True" to
+ the `connect()` call, then call `connectpoll()` until it either returns
+ `PGRES_POLLING_OK` or raises an exception. To avoid blocking in
+ `connectpoll()`, use `select()` or `poll()` to wait for the connection
+ to be readable or writable, depending on the return code of the
+ previous call to `connectpoll()`. The initial state is
+ `PGRES_POLLING_WRITING`.
+
+Example::
+
+ con = pg.connect('testdb', nowait=True)
+ conno = con.fileno()
+ rd = []
+ wt = [conno]
+ rc = pg.PGRES_POLLING_WRITING
+ while rc not in (pg.PGRES_POLLING_OK,pg.PGRES_POLLING_FAILED):
+ ra,wa,xa = select(rd, wt, [], timeout)
+ if not ra and not wa:
+ timedout()
+
+ rc = con.connectpoll()
+ if rc == pg.PGRES_POLLING_READING:
+ rd = [conno]
+ wt = []
+ else:
+ rd = []
+ wt = [conno]
+
fileno - returns the socket used to connect to the database
-----------------------------------------------------------
Syntax::
@@ -719,6 +863,47 @@
The use of direct access methods may desynchonize client and server.
This method ensure that client and server will be synchronized.
+setnonblocking - puts the connection into non-blocking mode
+-----------------------------------------------------------
+Syntax::
+
+ setnonblocking(nb)
+
+Parameters:
+ :nb: True to put the connection into non-blocking mode. False to put
+ it into blocking mode
+
+Return type:
+ None
+
+Exceptions raised:
+ :TypeError: invalid connection
+ :TypeError: too many parameters
+
+Description:
+ Puts the socket connection into non-blocking mode (or into blocking
+ mode). This affects copy commands and large object operations, but not
+ queries.
+
+isnonblocking - reports the connection's blocking status
+--------------------------------------------------------
+Syntax::
+
+ isnonblocking()
+
+Parameters:
+ None
+
+Return type:
+ :boolean: True if the connection is in non-blocking mode, False otherwise
+
+Exceptions raised:
+ :TypeError: invalid connection
+ :TypeError: too many parameters
+
+Description:
+ Returns True if the connection is in non-blocking mode. False otherwise.
+
locreate - create a large object in the database [LO]
-----------------------------------------------------
Syntax::
@@ -1158,6 +1343,8 @@
Return type:
:list: result values as a list of tuples
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
@@ -1179,6 +1366,8 @@
Return type:
:list: result values as a list of dictionaries
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
@@ -1200,6 +1389,8 @@
Return type:
:list: result values as a list of named tuples
+ Other types are possible when the `pgqueryobject` was returned by
+ `sendquery()`
Exceptions raised:
:TypeError: too many (any) parameters
Index: trunk/module/pg.py
===================================================================
--- trunk/module/pg.py (revision 520)
+++ trunk/module/pg.py (working copy)
@@ -121,8 +121,11 @@
def _namedresult(q):
"""Get query result as named tuples."""
+ # need to call this before listfields for async queries to work
+ res = q.getresult()
+ if not isinstance(res, list): return res
row = namedtuple('Row', q.listfields())
- return [row(*r) for r in q.getresult()]
+ return [row(*r) for r in res]
set_namedresult(_namedresult)
Index: trunk/module/test_pg.py
===================================================================
--- trunk/module/test_pg.py (revision 520)
+++ trunk/module/test_pg.py (working copy)
@@ -364,15 +364,23 @@
self.assertEqual(attributes, connection_attributes)
def testAllConnectMethods(self):
- methods = '''cancel close endcopy
+ methods = '''cancel close connectpoll endcopy
escape_bytea escape_identifier escape_literal escape_string
fileno get_notice_receiver getline getlo getnotify
- inserttable locreate loimport parameter putline query reset
- set_notice_receiver source transaction'''.split()
+ inserttable isnonblocking locreate loimport parameter putline
query reset
+ sendquery set_notice_receiver setnonblocking source
transaction'''.split()
connection_methods = [a for a in dir(self.connection)
if callable(eval("self.connection." + a))]
self.assertEqual(methods, connection_methods)
+ def testAsyncConnect(self):
+ self.connection.close()
+ self.connection = pg.connect(nowait=True)
+ rc = self.connection.connectpoll()
+ while rc not in (pg.PGRES_POLLING_OK, pg.PGRES_POLLING_FAILED):
+ rc = self.connection.connectpoll()
+ self.assertEqual(rc, pg.PGRES_POLLING_OK)
+
def testAttributeDb(self):
self.assertEqual(self.connection.db, self.dbname)
@@ -425,6 +433,28 @@
def testMethodQueryEmpty(self):
self.assertRaises(ValueError, self.connection.query, '')
+ def testMethodSendQuery(self):
+ pgq = self.connection.sendquery("select 1+1")
+ self.assertEqual(pgq.getresult()[0][0], 2)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+$1", (1,))
+ self.assertEqual(pgq.getresult()[0][0], 2)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+$1+$2", (2, 3))
+ self.assertEqual(pgq.getresult()[0][0], 6)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+$1+$2", [2, 3])
+ self.assertEqual(pgq.getresult()[0][0], 6)
+ self.assertIsNone(pgq.getresult())
+ pgq = self.connection.sendquery("select 1+1; select 'pg';")
+ self.assertEqual(pgq.getresult()[0][0], 2)
+ self.assertEqual(pgq.getresult()[0][0], 'pg')
+ self.assertIsNone(pgq.getresult())
+
+ def testMethodSendQueryEmpty(self):
+ pgq = self.connection.sendquery('')
+ self.assertRaises(ValueError, pgq.getresult)
+
def testMethodEndcopy(self):
try:
self.connection.endcopy()
@@ -478,6 +508,10 @@
result = [(0,)]
r = self.c.query(q).getresult()
self.assertEqual(r, result)
+ pgq = self.c.sendquery(q)
+ r = pgq.getresult()
+ self.assertEqual(r, result)
+ self.assertIsNone(pgq.getresult())
def testDictresult(self):
q = "select 0 as alias0"
@@ -484,6 +518,10 @@
result = [{'alias0': 0}]
r = self.c.query(q).dictresult()
self.assertEqual(r, result)
+ pgq = self.c.sendquery(q)
+ r = pgq.dictresult()
+ self.assertEqual(r, result)
+ self.assertIsNone(pgq.dictresult())
def testNamedresult(self):
if namedtuple:
@@ -494,6 +532,13 @@
v = r[0]
self.assertEqual(v._fields, ('alias0',))
self.assertEqual(v.alias0, 0)
+ pgq = self.c.sendquery(q)
+ r = pgq.namedresult()
+ self.assertEqual(r, result)
+ v = r[0]
+ self.assertEqual(v._fields, ('alias0',))
+ self.assertEqual(v.alias0, 0)
+ self.assertIsNone(pgq.namedresult())
def testGet3Cols(self):
q = "select 1,2,3"
@@ -970,6 +1015,7 @@
'clear',
'close',
'commit',
+ 'connectpoll',
'db',
'dbname',
'debug',
@@ -995,6 +1041,7 @@
'host',
'insert',
'inserttable',
+ 'isnonblocking',
'locreate',
'loimport',
'notification_handler',
@@ -1010,8 +1057,10 @@
'reset',
'rollback',
'savepoint',
+ 'sendquery',
'server_version',
'set_notice_receiver',
+ 'setnonblocking',
'source',
'start',
'status',
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql