I sent this previously as a single patch, but it's been stuck waiting
for moderation for a month, so I thought I'd try again with two smaller
patches.


The attached patch, along with the one in my next message, provides the
asynchronous operations described in section 31.4 of the postgres
manual. I believe everything described in that section is available with
these exceptions:

 - there's no prepared statement support
 - I didn't implement PQsetSingleRowMode(). This would require a
possibly small change to the way that query results are retrieved that I
thought would go better as a separate change set
 - I didn't implement PQconsumeInput() or PQisBusy(). I don't really
understand the point of these functions, they seem to have marginal
utility outside notification reception, and I wasn't sure exactly how to
document them. It might make sense to have an isbusy() call which calls
both, but I don't really know if that fits anybody's use case
 - I seem to have left out PQflush(). This was an oversight, but in
general, copy and large object operations are not well tested and might
not be useful.

In general, the changes allow the database to be used in an event-driven
application, and for other applications, there are some parallelism
benefits:
- Connections can be completed in the background, which can speed up use
cases where for instance the application needs to connect to several
databases at once
- when multiple semi-colon-delimited queries are run in a single call,
the results to all the queries are returned
- the application can do other work while waiting for queries to
complete
- copy and large object operations can use non-blocking IO

Query operations work essentially the same way as they do now, except
all the result codes are no returned by getresult(), dictresult() or
namedresult(), cases where query() returns None, getresult() et al
return '', and you have to call getresult() et al until they return
None. Also, exceptions raised by bad queries are raised by getresult()
et al, not by the query function.

The result member of pgqueryobject is changed by each call to
getresult() et al, so you can't get the same query result twice when
using asynchronous calls, and functions which depend on the result
member don't work until after a call to getresult() et al.

Because of this last point, I had to reorganize _namedresult(). That's
the only python change other than the unit test.

C code changes are:
 - some new functions
 - change to connect() to take a new argument and call
PQconnectStartParams() when appropriate
 - added code to getresult() and dictresult() to call PQgetResult when
appropriate. Looking at it now, this block of code has got to be quite
big and maybe should move to its own function
 - renamed pg_query to _pg_query and added a new argument. This is
called from wrapper functions pg_query() and pg_sendquery()
 - moved scalar result processing from pg_query to a new function,
_check_result_status()
 - changed pg_query to call PQsendQuery() or PQsendQueryParams() when
appropriate

-- 
Patrick TJ McPhee <[email protected]>

Index: trunk/docs/pg.txt
===================================================================
--- trunk/docs/pg.txt	(revision 520)
+++ trunk/docs/pg.txt	(working copy)
@@ -59,7 +59,7 @@
 -------------------------------
 Syntax::
 
-  connect([dbname], [host], [port], [opt], [tty], [user], [passwd])
+  connect([dbname], [host], [port], [opt], [tty], [user], [passwd], [nowait])
 
 Parameters:
   :dbname: name of connected database (string/None)
@@ -69,6 +69,7 @@
   :tty:    debug terminal (string/None)
   :user:   PostgreSQL user (string/None)
   :passwd: password for user (string/None)
+  :nowait: True to perform the connection asynchronously
 
 Return type:
   :pgobject: If successful, the `pgobject` handling the connection
@@ -86,6 +87,7 @@
   Python tutorial. The names of the keywords are the name of the
   parameters given in the syntax line. For a precise description
   of the parameters, please refer to the PostgreSQL user manual.
+  See connectpoll() for a description of the nowait parameter.
 
 Examples::
 
@@ -480,6 +482,95 @@
   phone = con.query("select phone from employees"
     " where name=$1", (name, )).getresult()
 
+sendquery - executes a SQL command string asynchronously
+--------------------------------------------------------
+Syntax::
+
+  sendquery(command, [args])
+
+Parameters:
+  :command: SQL command (string)
+  :args: optional positional arguments
+
+Return type:
+  :pgqueryobject
+
+Exceptions raised:
+  :TypeError: bad argument type, or too many arguments
+  :TypeError: invalid connection
+
+Description:
+  `sendquery()` is much the same as `query()`, except that it returns without
+  waiting for the query to complete. The database connection cannot be used
+  for other operations until the query completes, but the application can
+  do other things, including executing queries using other database connections.
+  The application can call `select()` using the connection's `fileno()` to
+  determine when the query has results to return.
+
+  `sendquery()` always returns a `pgqueryobject`. This object differs from
+  the `pgqueryobject` returned by `query()` in a few ways. Most importantly,
+  when `sendquery()` is used, the application must call one of the
+  result-returning methods (`getresult()`, `dictresult()`, or `namedresult()`)
+  on the `pgqueryobject` until it either throws an exception or returns `None`.
+  Otherwise, the database connection will be left in an unusable state.
+
+  In cases when `query()` would return something other than a `pgqueryobject`,
+  that result will be returned by calling one of the result-returning methods
+  on the `pgqueryobject` returned by `sendquery()`. There's one important
+  difference in these result codes: if `query()` returns `None`, the
+  result-returning methods will return an empty string (`''`). It's still
+  necessary to call a result-returning function until it returns `None`.
+
+  `listfields()`, `fieldname()`, `fieldnum()`, and `ntuples()` only work
+  after a call to a result-returning method with a non-`None` return value.
+  `ntuples()` returns only the number of rows returned by the previous
+  result-returning function. 
+
+  If multiple semi-colon-delimited statements are passed to `query()`, only
+  the results of the last statement are returned in the `pgqueryobject`. With
+  `sendquery()`, all results are returned. Each result set will be
+  returned by a separate call to `getresult()`.
+
+Example::
+
+  name = raw_input("Name? ")
+  pgq = con.sendquery("select phone from employees"
+    " where name=$1", (name, ))
+  phone = pgq.getresult()
+  pgq.getresult() # to close the query
+
+  # initiate two queries in one round trip
+  # note this differs from a union since the result sets have different
+  # structures
+  pgq = con.sendquery("select a,b,c from x where d=e; select e,f from y where g")
+  qrabc = pgq.dictresult() # results from x
+  qref = pgq.dictresult()  # results from y
+  pgq.dictresult()         # to close the query
+
+  # using select to wait for the query to be ready
+  pgq = con.sendquery("select pg_sleep(20)")
+  r,w,e = select([con.fileno(),other,sockets],[],[])
+  if con.fileno() in r:
+      results = pgq.getresult()
+      pgq.getresult() # to close the query
+
+  # concurrent queries on separate connections
+  con1 = connect()
+  con2 = connect()
+  ss = con1.query("begin; set transaction isolation level repeatable read;"
+                  "select pg_export_snapshot();").getresult()[0][0]
+  con2.query("begin; set transaction isolation level repeatable read;"
+             "set transaction snapshot '%s'" % (ss,))
+  pgq1 = con1.sendquery("select a,b,c from x where d=e")
+  pgq2 = con2.sendquery("select e,f from y where g")
+  qr1 = pgq1.getresult()
+  pgq1.getresult()
+  qr2 = pgq2.getresult()
+  pgq2.getresult()
+  con1.query("commit")
+  con2.query("commit")
+
+
 reset - resets the connection
 -----------------------------
 Syntax::
@@ -540,6 +631,59 @@
   allows you to explicitly close it. It is mainly here to allow
   the DB-SIG API wrapper to implement a close function.
 
+connectpoll - completes an asynchronous connection
+--------------------------------------------------
+Syntax::
+
+  connectpoll()
+
+Parameters:
+  None
+
+Return type:
+  :int: PGRES_POLLING_OK, PGRES_POLLING_FAILED, PGRES_POLLING_READING or
+        PGRES_POLLING_WRITING
+
+Exceptions raised:
+  :TypeError: too many (any) arguments
+  :TypeError: invalid connection
+  :pg.InternalError: some error occurred during pg connection
+
+Description:
+  The database connection can be performed without any blocking
+  calls. This allows the application mainline to perform other 
+  operations or perhaps connect to multiple databases concurrently.
+  Once the connecton is established, it's no different from a connection
+  made using blocking calls.
+
+  The required steps are to pass the parameter "nowait=True" to
+  the `connect()` call, then call `connectpoll()` until it either returns
+  `PGRES_POLLING_OK` or raises an exception. To avoid blocking in
+  `connectpoll()`, use `select()` or `poll()` to wait for the connection
+  to be readable or writable, depending on the return code of the
+  previous call to `connectpoll()`. The initial state is
+  `PGRES_POLLING_WRITING`.
+
+Example::
+
+  con = pg.connect('testdb', nowait=True)
+  conno = con.fileno()
+  rd = []
+  wt = [conno]
+  rc = pg.PGRES_POLLING_WRITING
+  while rc not in (pg.PGRES_POLLING_OK,pg.PGRES_POLLING_FAILED):
+      ra,wa,xa = select(rd, wt, [], timeout)
+      if not ra and not wa:
+          timedout()
+
+      rc = con.connectpoll()
+      if rc == pg.PGRES_POLLING_READING:
+          rd = [conno]
+          wt = []
+      else:
+          rd = []
+          wt = [conno]
+          
 fileno - returns the socket used to connect to the database
 -----------------------------------------------------------
 Syntax::
@@ -719,6 +863,47 @@
   The use of direct access methods may desynchonize client and server.
   This method ensure that client and server will be synchronized.
 
+setnonblocking - puts the connection into non-blocking mode
+-----------------------------------------------------------
+Syntax::
+
+  setnonblocking(nb)
+
+Parameters:
+  :nb: True to put the connection into non-blocking mode. False to put
+       it into blocking mode
+
+Return type:
+  None
+
+Exceptions raised:
+  :TypeError: invalid connection
+  :TypeError: too many parameters
+
+Description:
+  Puts the socket connection into non-blocking mode (or into blocking
+  mode). This affects copy commands and large object operations, but not
+  queries.
+
+isnonblocking - reports the connection's blocking status
+--------------------------------------------------------
+Syntax::
+
+  isnonblocking()
+
+Parameters:
+  None
+
+Return type:
+  :boolean: True if the connection is in non-blocking mode, False otherwise 
+
+Exceptions raised:
+  :TypeError: invalid connection
+  :TypeError: too many parameters
+
+Description:
+  Returns True if the connection is in non-blocking mode. False otherwise.
+
 locreate - create a large object in the database [LO]
 -----------------------------------------------------
 Syntax::
@@ -1158,6 +1343,8 @@
 
 Return type:
   :list: result values as a list of tuples
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
@@ -1179,6 +1366,8 @@
 
 Return type:
   :list: result values as a list of dictionaries
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
@@ -1200,6 +1389,8 @@
 
 Return type:
   :list: result values as a list of named tuples
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
Index: trunk/module/pg.py
===================================================================
--- trunk/module/pg.py	(revision 520)
+++ trunk/module/pg.py	(working copy)
@@ -121,8 +121,11 @@
 
     def _namedresult(q):
         """Get query result as named tuples."""
+        # need to call this before listfields for async queries to work
+        res = q.getresult()
+        if not isinstance(res, list): return res
         row = namedtuple('Row', q.listfields())
-        return [row(*r) for r in q.getresult()]
+        return [row(*r) for r in res]
 
     set_namedresult(_namedresult)
 
Index: trunk/module/test_pg.py
===================================================================
--- trunk/module/test_pg.py	(revision 520)
+++ trunk/module/test_pg.py	(working copy)
@@ -364,15 +364,23 @@
         self.assertEqual(attributes, connection_attributes)
 
     def testAllConnectMethods(self):
-        methods = '''cancel close endcopy
+        methods = '''cancel close connectpoll endcopy
             escape_bytea escape_identifier escape_literal escape_string
             fileno get_notice_receiver getline getlo getnotify
-            inserttable locreate loimport parameter putline query reset
-            set_notice_receiver source transaction'''.split()
+            inserttable isnonblocking locreate loimport parameter putline query reset
+            sendquery set_notice_receiver setnonblocking source transaction'''.split()
         connection_methods = [a for a in dir(self.connection)
             if callable(eval("self.connection." + a))]
         self.assertEqual(methods, connection_methods)
 
+    def testAsyncConnect(self):
+        self.connection.close()
+        self.connection = pg.connect(nowait=True)
+        rc = self.connection.connectpoll()
+        while rc not in (pg.PGRES_POLLING_OK, pg.PGRES_POLLING_FAILED):
+            rc = self.connection.connectpoll()
+        self.assertEqual(rc, pg.PGRES_POLLING_OK)
+
     def testAttributeDb(self):
         self.assertEqual(self.connection.db, self.dbname)
 
@@ -425,6 +433,28 @@
     def testMethodQueryEmpty(self):
         self.assertRaises(ValueError, self.connection.query, '')
 
+    def testMethodSendQuery(self):
+        pgq = self.connection.sendquery("select 1+1")
+        self.assertEqual(pgq.getresult()[0][0], 2)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+$1", (1,))
+        self.assertEqual(pgq.getresult()[0][0], 2)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+$1+$2", (2, 3))
+        self.assertEqual(pgq.getresult()[0][0], 6)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+$1+$2", [2, 3])
+        self.assertEqual(pgq.getresult()[0][0], 6)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+1; select 'pg';")
+        self.assertEqual(pgq.getresult()[0][0], 2)
+        self.assertEqual(pgq.getresult()[0][0], 'pg')
+        self.assertIsNone(pgq.getresult())
+
+    def testMethodSendQueryEmpty(self):
+        pgq = self.connection.sendquery('')
+        self.assertRaises(ValueError, pgq.getresult)
+
     def testMethodEndcopy(self):
         try:
             self.connection.endcopy()
@@ -478,6 +508,10 @@
         result = [(0,)]
         r = self.c.query(q).getresult()
         self.assertEqual(r, result)
+        pgq = self.c.sendquery(q)
+        r = pgq.getresult()
+        self.assertEqual(r, result)
+        self.assertIsNone(pgq.getresult())
 
     def testDictresult(self):
         q = "select 0 as alias0"
@@ -484,6 +518,10 @@
         result = [{'alias0': 0}]
         r = self.c.query(q).dictresult()
         self.assertEqual(r, result)
+        pgq = self.c.sendquery(q)
+        r = pgq.dictresult()
+        self.assertEqual(r, result)
+        self.assertIsNone(pgq.dictresult())
 
     def testNamedresult(self):
         if namedtuple:
@@ -494,6 +532,13 @@
             v = r[0]
             self.assertEqual(v._fields, ('alias0',))
             self.assertEqual(v.alias0, 0)
+            pgq = self.c.sendquery(q)
+            r = pgq.namedresult()
+            self.assertEqual(r, result)
+            v = r[0]
+            self.assertEqual(v._fields, ('alias0',))
+            self.assertEqual(v.alias0, 0)
+            self.assertIsNone(pgq.namedresult())
 
     def testGet3Cols(self):
         q = "select 1,2,3"
@@ -970,6 +1015,7 @@
             'clear',
             'close',
             'commit',
+            'connectpoll',
             'db',
             'dbname',
             'debug',
@@ -995,6 +1041,7 @@
             'host',
             'insert',
             'inserttable',
+            'isnonblocking',
             'locreate',
             'loimport',
             'notification_handler',
@@ -1010,8 +1057,10 @@
             'reset',
             'rollback',
             'savepoint',
+            'sendquery',
             'server_version',
             'set_notice_receiver',
+            'setnonblocking',
             'source',
             'start',
             'status',
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to