There's a Gary Larson cartoon which has a bunch of humans standing
around a space ship, and one of the aliens has obviously just fallen
down the stairs. At the top, another alien says "well, so much for
inspiring them with a sense of awe". I keep falling down the stairs when
sending this patch to the list.

I described this patch in a message Monday, in which I spelled
asynchronous incorrectly, but the attachment was scrubbed I hope because
of the mime type. The next message will have the changes to the C
module.

-- 
Patrick TJ McPhee <[email protected]>

Index: trunk/docs/pg.txt
===================================================================
--- trunk/docs/pg.txt   (revision 520)
+++ trunk/docs/pg.txt   (working copy)
@@ -59,7 +59,7 @@
 -------------------------------
 Syntax::
 
-  connect([dbname], [host], [port], [opt], [tty], [user], [passwd])
+  connect([dbname], [host], [port], [opt], [tty], [user], [passwd], [nowait])
 
 Parameters:
   :dbname: name of connected database (string/None)
@@ -69,6 +69,7 @@
   :tty:    debug terminal (string/None)
   :user:   PostgreSQL user (string/None)
   :passwd: password for user (string/None)
+  :nowait: True to perform the connection asynchronously
 
 Return type:
   :pgobject: If successful, the `pgobject` handling the connection
@@ -86,6 +87,7 @@
   Python tutorial. The names of the keywords are the name of the
   parameters given in the syntax line. For a precise description
   of the parameters, please refer to the PostgreSQL user manual.
+  See connectpoll() for a description of the nowait parameter.
 
 Examples::
 
@@ -480,6 +482,95 @@
   phone = con.query("select phone from employees"
     " where name=$1", (name, )).getresult()
 
+sendquery - executes a SQL command string asynchronously
+--------------------------------------------------------
+Syntax::
+
+  sendquery(command, [args])
+
+Parameters:
+  :command: SQL command (string)
+  :args: optional positional arguments
+
+Return type:
+  :pgqueryobject
+
+Exceptions raised:
+  :TypeError: bad argument type, or too many arguments
+  :TypeError: invalid connection
+
+Description:
+  `sendquery()` is much the same as `query()`, except that it returns without
+  waiting for the query to complete. The database connection cannot be used
+  for other operations until the query completes, but the application can
+  do other things, including executing queries using other database 
connections.
+  The application can call `select()` using the connection's `fileno()` to
+  determine when the query has results to return.
+
+  `sendquery()` always returns a `pgqueryobject`. This object differs from
+  the `pgqueryobject` returned by `query()` in a few ways. Most importantly,
+  when `sendquery()` is used, the application must call one of the
+  result-returning methods (`getresult()`, `dictresult()`, or `namedresult()`)
+  on the `pgqueryobject` until it either throws an exception or returns `None`.
+  Otherwise, the database connection will be left in an unusable state.
+
+  In cases when `query()` would return something other than a `pgqueryobject`,
+  that result will be returned by calling one of the result-returning methods
+  on the `pgqueryobject` returned by `sendquery()`. There's one important
+  difference in these result codes: if `query()` returns `None`, the
+  result-returning methods will return an empty string (`''`). It's still
+  necessary to call a result-returning function until it returns `None`.
+
+  `listfields()`, `fieldname()`, `fieldnum()`, and `ntuples()` only work
+  after a call to a result-returning method with a non-`None` return value.
+  `ntuples()` returns only the number of rows returned by the previous
+  result-returning function. 
+
+  If multiple semi-colon-delimited statements are passed to `query()`, only
+  the results of the last statement are returned in the `pgqueryobject`. With
+  `sendquery()`, all results are returned. Each result set will be
+  returned by a separate call to `getresult()`.
+
+Example::
+
+  name = raw_input("Name? ")
+  pgq = con.sendquery("select phone from employees"
+    " where name=$1", (name, ))
+  phone = pgq.getresult()
+  pgq.getresult() # to close the query
+
+  # initiate two queries in one round trip
+  # note this differs from a union since the result sets have different
+  # structures
+  pgq = con.sendquery("select a,b,c from x where d=e; select e,f from y where 
g")
+  qrabc = pgq.dictresult() # results from x
+  qref = pgq.dictresult()  # results from y
+  pgq.dictresult()         # to close the query
+
+  # using select to wait for the query to be ready
+  pgq = con.sendquery("select pg_sleep(20)")
+  r,w,e = select([con.fileno(),other,sockets],[],[])
+  if con.fileno() in r:
+      results = pgq.getresult()
+      pgq.getresult() # to close the query
+
+  # concurrent queries on separate connections
+  con1 = connect()
+  con2 = connect()
+  ss = con1.query("begin; set transaction isolation level repeatable read;"
+                  "select pg_export_snapshot();").getresult()[0][0]
+  con2.query("begin; set transaction isolation level repeatable read;"
+             "set transaction snapshot '%s'" % (ss,))
+  pgq1 = con1.sendquery("select a,b,c from x where d=e")
+  pgq2 = con2.sendquery("select e,f from y where g")
+  qr1 = pgq1.getresult()
+  pgq1.getresult()
+  qr2 = pgq2.getresult()
+  pgq2.getresult()
+  con1.query("commit")
+  con2.query("commit")
+
+
 reset - resets the connection
 -----------------------------
 Syntax::
@@ -540,6 +631,59 @@
   allows you to explicitly close it. It is mainly here to allow
   the DB-SIG API wrapper to implement a close function.
 
+connectpoll - completes an asynchronous connection
+--------------------------------------------------
+Syntax::
+
+  connectpoll()
+
+Parameters:
+  None
+
+Return type:
+  :int: PGRES_POLLING_OK, PGRES_POLLING_FAILED, PGRES_POLLING_READING or
+        PGRES_POLLING_WRITING
+
+Exceptions raised:
+  :TypeError: too many (any) arguments
+  :TypeError: invalid connection
+  :pg.InternalError: some error occurred during pg connection
+
+Description:
+  The database connection can be performed without any blocking
+  calls. This allows the application mainline to perform other 
+  operations or perhaps connect to multiple databases concurrently.
+  Once the connecton is established, it's no different from a connection
+  made using blocking calls.
+
+  The required steps are to pass the parameter "nowait=True" to
+  the `connect()` call, then call `connectpoll()` until it either returns
+  `PGRES_POLLING_OK` or raises an exception. To avoid blocking in
+  `connectpoll()`, use `select()` or `poll()` to wait for the connection
+  to be readable or writable, depending on the return code of the
+  previous call to `connectpoll()`. The initial state is
+  `PGRES_POLLING_WRITING`.
+
+Example::
+
+  con = pg.connect('testdb', nowait=True)
+  conno = con.fileno()
+  rd = []
+  wt = [conno]
+  rc = pg.PGRES_POLLING_WRITING
+  while rc not in (pg.PGRES_POLLING_OK,pg.PGRES_POLLING_FAILED):
+      ra,wa,xa = select(rd, wt, [], timeout)
+      if not ra and not wa:
+          timedout()
+
+      rc = con.connectpoll()
+      if rc == pg.PGRES_POLLING_READING:
+          rd = [conno]
+          wt = []
+      else:
+          rd = []
+          wt = [conno]
+          
 fileno - returns the socket used to connect to the database
 -----------------------------------------------------------
 Syntax::
@@ -719,6 +863,47 @@
   The use of direct access methods may desynchonize client and server.
   This method ensure that client and server will be synchronized.
 
+setnonblocking - puts the connection into non-blocking mode
+-----------------------------------------------------------
+Syntax::
+
+  setnonblocking(nb)
+
+Parameters:
+  :nb: True to put the connection into non-blocking mode. False to put
+       it into blocking mode
+
+Return type:
+  None
+
+Exceptions raised:
+  :TypeError: invalid connection
+  :TypeError: too many parameters
+
+Description:
+  Puts the socket connection into non-blocking mode (or into blocking
+  mode). This affects copy commands and large object operations, but not
+  queries.
+
+isnonblocking - reports the connection's blocking status
+--------------------------------------------------------
+Syntax::
+
+  isnonblocking()
+
+Parameters:
+  None
+
+Return type:
+  :boolean: True if the connection is in non-blocking mode, False otherwise 
+
+Exceptions raised:
+  :TypeError: invalid connection
+  :TypeError: too many parameters
+
+Description:
+  Returns True if the connection is in non-blocking mode. False otherwise.
+
 locreate - create a large object in the database [LO]
 -----------------------------------------------------
 Syntax::
@@ -1158,6 +1343,8 @@
 
 Return type:
   :list: result values as a list of tuples
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
@@ -1179,6 +1366,8 @@
 
 Return type:
   :list: result values as a list of dictionaries
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
@@ -1200,6 +1389,8 @@
 
 Return type:
   :list: result values as a list of named tuples
+  Other types are possible when the `pgqueryobject` was returned by
+  `sendquery()`
 
 Exceptions raised:
   :TypeError: too many (any) parameters
Index: trunk/module/pg.py
===================================================================
--- trunk/module/pg.py  (revision 520)
+++ trunk/module/pg.py  (working copy)
@@ -121,8 +121,11 @@
 
     def _namedresult(q):
         """Get query result as named tuples."""
+        # need to call this before listfields for async queries to work
+        res = q.getresult()
+        if not isinstance(res, list): return res
         row = namedtuple('Row', q.listfields())
-        return [row(*r) for r in q.getresult()]
+        return [row(*r) for r in res]
 
     set_namedresult(_namedresult)
 
Index: trunk/module/test_pg.py
===================================================================
--- trunk/module/test_pg.py     (revision 520)
+++ trunk/module/test_pg.py     (working copy)
@@ -364,15 +364,23 @@
         self.assertEqual(attributes, connection_attributes)
 
     def testAllConnectMethods(self):
-        methods = '''cancel close endcopy
+        methods = '''cancel close connectpoll endcopy
             escape_bytea escape_identifier escape_literal escape_string
             fileno get_notice_receiver getline getlo getnotify
-            inserttable locreate loimport parameter putline query reset
-            set_notice_receiver source transaction'''.split()
+            inserttable isnonblocking locreate loimport parameter putline 
query reset
+            sendquery set_notice_receiver setnonblocking source 
transaction'''.split()
         connection_methods = [a for a in dir(self.connection)
             if callable(eval("self.connection." + a))]
         self.assertEqual(methods, connection_methods)
 
+    def testAsyncConnect(self):
+        self.connection.close()
+        self.connection = pg.connect(nowait=True)
+        rc = self.connection.connectpoll()
+        while rc not in (pg.PGRES_POLLING_OK, pg.PGRES_POLLING_FAILED):
+            rc = self.connection.connectpoll()
+        self.assertEqual(rc, pg.PGRES_POLLING_OK)
+
     def testAttributeDb(self):
         self.assertEqual(self.connection.db, self.dbname)
 
@@ -425,6 +433,28 @@
     def testMethodQueryEmpty(self):
         self.assertRaises(ValueError, self.connection.query, '')
 
+    def testMethodSendQuery(self):
+        pgq = self.connection.sendquery("select 1+1")
+        self.assertEqual(pgq.getresult()[0][0], 2)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+$1", (1,))
+        self.assertEqual(pgq.getresult()[0][0], 2)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+$1+$2", (2, 3))
+        self.assertEqual(pgq.getresult()[0][0], 6)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+$1+$2", [2, 3])
+        self.assertEqual(pgq.getresult()[0][0], 6)
+        self.assertIsNone(pgq.getresult())
+        pgq = self.connection.sendquery("select 1+1; select 'pg';")
+        self.assertEqual(pgq.getresult()[0][0], 2)
+        self.assertEqual(pgq.getresult()[0][0], 'pg')
+        self.assertIsNone(pgq.getresult())
+
+    def testMethodSendQueryEmpty(self):
+        pgq = self.connection.sendquery('')
+        self.assertRaises(ValueError, pgq.getresult)
+
     def testMethodEndcopy(self):
         try:
             self.connection.endcopy()
@@ -478,6 +508,10 @@
         result = [(0,)]
         r = self.c.query(q).getresult()
         self.assertEqual(r, result)
+        pgq = self.c.sendquery(q)
+        r = pgq.getresult()
+        self.assertEqual(r, result)
+        self.assertIsNone(pgq.getresult())
 
     def testDictresult(self):
         q = "select 0 as alias0"
@@ -484,6 +518,10 @@
         result = [{'alias0': 0}]
         r = self.c.query(q).dictresult()
         self.assertEqual(r, result)
+        pgq = self.c.sendquery(q)
+        r = pgq.dictresult()
+        self.assertEqual(r, result)
+        self.assertIsNone(pgq.dictresult())
 
     def testNamedresult(self):
         if namedtuple:
@@ -494,6 +532,13 @@
             v = r[0]
             self.assertEqual(v._fields, ('alias0',))
             self.assertEqual(v.alias0, 0)
+            pgq = self.c.sendquery(q)
+            r = pgq.namedresult()
+            self.assertEqual(r, result)
+            v = r[0]
+            self.assertEqual(v._fields, ('alias0',))
+            self.assertEqual(v.alias0, 0)
+            self.assertIsNone(pgq.namedresult())
 
     def testGet3Cols(self):
         q = "select 1,2,3"
@@ -970,6 +1015,7 @@
             'clear',
             'close',
             'commit',
+            'connectpoll',
             'db',
             'dbname',
             'debug',
@@ -995,6 +1041,7 @@
             'host',
             'insert',
             'inserttable',
+            'isnonblocking',
             'locreate',
             'loimport',
             'notification_handler',
@@ -1010,8 +1057,10 @@
             'reset',
             'rollback',
             'savepoint',
+            'sendquery',
             'server_version',
             'set_notice_receiver',
+            'setnonblocking',
             'source',
             'start',
             'status',
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to