Author: cito
Date: Fri Jan 1 12:32:24 2016
New Revision: 682
Log:
Add cursor classes for returning various row types
Currently, PyGreSQL returns rows as lists in the DB-API 2 module.
It is more common to return tuples (and consistent with how it
is done int the classic module). Even better are named tuples
which are cheap and available in all Py versions PyGreSQL supports.
So I made tuples the default now and added additional Cursor types
for returning rows as lists, named tuples, dicts and ordered dicts.
Also added an attribute for setting the default Cursor type in the
connection and special methods for creating the various cursor types
from the same connection.
However, I think this should be simplified and named tuples should
become the default. Named tuples can be easily converted to ordinary
lists and tuples (although this should never be necessary), and they
can also be easily converted to ordered dicts by calling ._asdict().
So I'm planning to change this before long, but for reference I have
checked in this implementation with all necessary tests anyway.
Modified:
trunk/module/pgdb.py
trunk/module/tests/test_dbapi20.py
Modified: trunk/module/pgdb.py
==============================================================================
--- trunk/module/pgdb.py Wed Dec 30 17:39:53 2015 (r681)
+++ trunk/module/pgdb.py Fri Jan 1 12:32:24 2016 (r682)
@@ -89,6 +89,15 @@
except NameError: # Python >= 3.0
basestring = (str, bytes)
+try:
+ from collections import OrderedDict
+except ImportError: # Python 2.6 or 3.0
+ try:
+ from ordereddict import OrderedDict
+ except Exception:
+ def OrderedDict(*args):
+ raise NotSupportedError('OrderedDict is not supported')
+
set_decimal(Decimal)
@@ -219,12 +228,15 @@
class Cursor(object):
"""Cursor object."""
+ row_factory = tuple # the factory for creating result rows
+
def __init__(self, dbcnx):
"""Create a cursor object for the database connection."""
self.connection = self._dbcnx = dbcnx
self._cnx = dbcnx._cnx
self._type_cache = dbcnx._type_cache
self._src = self._cnx.source()
+ self.colnames = self.coltypes = None
self.description = None
self.rowcount = -1
self.arraysize = 1
@@ -287,27 +299,10 @@
params = tuple(map(self._quote, params))
return string % params
- @staticmethod
- def row_factory(row):
- """Process rows before they are returned.
-
- You can overwrite this with a custom row factory,
- e.g. a dict factory:
-
- class DictCursor(pgdb.Cursor):
-
- def row_factory(self, row):
- return {desc[0]:value
- for desc, value in zip(self.description, row)}
-
- cur = DictCursor(con)
-
- """
- return row
-
def close(self):
"""Close the cursor object."""
self._src.close()
+ self.colnames = self.coltypes = None
self.description = None
self.rowcount = -1
self.lastrowid = None
@@ -330,6 +325,7 @@
if not param_seq:
# don't do anything without parameters
return
+ self.colnames = self.coltypes = None
self.description = None
self.rowcount = -1
# first try to execute all queries
@@ -364,13 +360,14 @@
if self._src.resulttype == RESULT_DQL:
self.rowcount = self._src.ntuples
getdescr = self._type_cache.getdescr
- coltypes = self._src.listinfo()
- self.description = [CursorDescription(
- typ[1], *getdescr(typ[2])) for typ in coltypes]
+ description = [CursorDescription(
+ info[1], *getdescr(info[2])) for info in self._src.listinfo()]
+ self.colnames = [info[0] for info in description]
+ self.coltypes = [info[1] for info in description]
+ self.description = description
self.lastrowid = None
else:
self.rowcount = totrows
- self.description = None
self.lastrowid = self._src.oidstatus()
# return the cursor object, so you can write statements such as
# "cursor.execute(...).fetchall()" or "for row in cursor.execute(...)"
@@ -407,11 +404,9 @@
raise
except Error as err:
raise _db_error(str(err))
- row_factory = self.row_factory
typecast = self._type_cache.typecast
- coltypes = [desc[1] for desc in self.description]
- return [row_factory([typecast(*args)
- for args in zip(coltypes, row)]) for row in result]
+ return [self.row_factory([typecast(typ, value)
+ for typ, value in zip(self.coltypes, row)]) for row in result]
def __next__(self):
"""Return the next row (support for the iteration protocol)."""
@@ -445,6 +440,56 @@
'precision', 'scale', 'null_ok'])
+class ListCursor(Cursor):
+ """Cursor object that returns rows as lists."""
+
+ row_factory = list
+
+
+class DictCursor(Cursor):
+ """Cursor object that returns rows as dictionaries."""
+
+ def row_factory(self, row):
+ """Turn a row from a tuple into a dictionary."""
+ # not using dict comprehension to stay compatible with Py 2.6
+ return dict((key, value) for key, value in zip(self.colnames, row))
+
+
+class OrderedDictCursor(Cursor):
+ """Cursor object that returns rows as ordered dictionaries."""
+
+ def row_factory(self, row):
+ """Turn a row from a tuple into an ordered dictionary."""
+ return OrderedDict(
+ (key, value) for key, value in zip(self.colnames, row))
+
+
+class NamedTupleCursor(Cursor):
+ """Cursor object that returns rows as named tuples."""
+
+ @property
+ def colnames(self):
+ return self._colnames
+
+ @colnames.setter
+ def colnames(self, value):
+ self._colnames = value
+ if value:
+ try:
+ try:
+ factory = namedtuple('Row', value, rename=True)._make
+ except TypeError: # Python 2.6 and 3.0 do not support rename
+ value = [v if v.isalnum() else 'column_%d' % n
+ for n, v in enumerate(value)]
+ factory = namedtuple('Row', value)._make
+ except ValueError:
+ value = ['column_%d' % n for n in range(len(value))]
+ factory = namedtuple('Row', value)._make
+ else:
+ factory = tuple
+ self._colnames, self.row_factory = value, factory
+
+
### Connection Objects
class Connection(object):
@@ -467,6 +512,8 @@
self._cnx = cnx # connection
self._tnx = False # transaction state
self._type_cache = TypeCache(cnx)
+ self.cursor_type = Cursor
+ self.row_type = tuple
try:
self._cnx.source()
except Exception:
@@ -532,16 +579,36 @@
else:
raise _op_error("connection has been closed")
- def cursor(self):
+ def cursor(self, cls=None):
"""Return a new cursor object using the connection."""
if self._cnx:
try:
- return Cursor(self)
+ return (cls or self.cursor_type)(self)
except Exception:
raise _op_error("invalid connection")
else:
raise _op_error("connection has been closed")
+ def list_cursor(self):
+ """Return a new tuple cursor object using the connection."""
+ return self.cursor(ListCursor)
+
+ def tuple_cursor(self):
+ """Return a new tuple cursor object using the connection."""
+ return self.cursor(Cursor)
+
+ def named_tuple_cursor(self):
+ """Return a new named tuple cursor object using the connection."""
+ return self.cursor(NamedTupleCursor)
+
+ def dict_cursor(self):
+ """Return a new dict cursor object using the connection."""
+ return self.cursor(DictCursor)
+
+ def ordered_dict_cursor(self):
+ """Return a new ordered dict cursor object using the connection."""
+ return self.cursor(OrderedDictCursor)
+
if shortcutmethods: # otherwise do not implement and document this
def execute(self, operation, params=None):
Modified: trunk/module/tests/test_dbapi20.py
==============================================================================
--- trunk/module/tests/test_dbapi20.py Wed Dec 30 17:39:53 2015 (r681)
+++ trunk/module/tests/test_dbapi20.py Fri Jan 1 12:32:24 2016 (r682)
@@ -33,6 +33,11 @@
except NameError: # Python >= 3.0
long = int
+try:
+ from collections import OrderedDict
+except ImportError: # Python 2.6 or 3.0
+ OrderedDict = None
+
class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
@@ -61,19 +66,165 @@
def tearDown(self):
dbapi20.DatabaseAPI20Test.tearDown(self)
+ def test_cursortype(self):
+ con = self._connect()
+ self.assertIs(con.cursor_type, pgdb.Cursor)
+ cur = con.cursor()
+ self.assertIsInstance(cur, pgdb.Cursor)
+ self.assertNotIsInstance(cur, pgdb.ListCursor)
+ cur.close()
+ con.cursor_type = pgdb.ListCursor
+ cur = con.cursor()
+ self.assertIsInstance(cur, pgdb.Cursor)
+ self.assertIsInstance(cur, pgdb.ListCursor)
+ cur.close()
+ cur = con.cursor()
+ self.assertIsInstance(cur, pgdb.ListCursor)
+ cur.close()
+ con.close()
+ con = self._connect()
+ self.assertIs(con.cursor_type, pgdb.Cursor)
+ cur = con.cursor()
+ self.assertIsInstance(cur, pgdb.Cursor)
+ self.assertNotIsInstance(cur, pgdb.ListCursor)
+ cur.close()
+ con.close()
+
+ def test_list_cursor(self):
+ con = self._connect()
+ cur = con.list_cursor()
+ self.assertIsInstance(cur, pgdb.ListCursor)
+ cur.execute("select 1, 2, 3")
+ res = cur.fetchone()
+ cur.close()
+ con.close()
+ self.assertIsInstance(res, list)
+ self.assertEqual(res, [1, 2, 3])
+
+ def test_tuple_cursor(self):
+ con = self._connect()
+ cur = con.tuple_cursor()
+ self.assertIsInstance(cur, pgdb.Cursor)
+ cur.execute("select 1, 2, 3")
+ res = cur.fetchone()
+ cur.close()
+ con.close()
+ self.assertIsInstance(res, tuple)
+ self.assertEqual(res, (1, 2, 3))
+ self.assertRaises(AttributeError, getattr, res, '_fields')
+
+ def test_named_tuple_cursor(self):
+ con = self._connect()
+ cur = con.named_tuple_cursor()
+ self.assertIsInstance(cur, pgdb.NamedTupleCursor)
+ cur.execute("select 1 as abc, 2 as de, 3 as f")
+ res = cur.fetchone()
+ cur.close()
+ con.close()
+ self.assertIsInstance(res, tuple)
+ self.assertEqual(res, (1, 2, 3))
+ self.assertEqual(res._fields, ('abc', 'de', 'f'))
+ self.assertEqual(res.abc, 1)
+ self.assertEqual(res.de, 2)
+ self.assertEqual(res.f, 3)
+
+ def test_named_tuple_cursor_with_bad_names(self):
+ con = self._connect()
+ cur = con.named_tuple_cursor()
+ self.assertIsInstance(cur, pgdb.NamedTupleCursor)
+ cur.execute("select 1, 2, 3")
+ res = cur.fetchone()
+ self.assertIsInstance(res, tuple)
+ self.assertEqual(res, (1, 2, 3))
+ old_py = OrderedDict is None # Python 2.6 or 3.0
+ # old Python versions cannot rename tuple fields with underscore
+ if old_py:
+ self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
+ else:
+ self.assertEqual(res._fields, ('_0', '_1', '_2'))
+ cur.execute("select 1 as one, 2, 3 as three")
+ res = cur.fetchone()
+ self.assertIsInstance(res, tuple)
+ self.assertEqual(res, (1, 2, 3))
+ if old_py: # cannot auto rename with underscore
+ self.assertEqual(res._fields, ('one', 'column_1', 'three'))
+ else:
+ self.assertEqual(res._fields, ('one', '_1', 'three'))
+ cur.execute("select 1 as abc, 2 as def")
+ res = cur.fetchone()
+ self.assertIsInstance(res, tuple)
+ self.assertEqual(res, (1, 2))
+ if old_py:
+ self.assertEqual(res._fields, ('column_0', 'column_1'))
+ else:
+ self.assertEqual(res._fields, ('abc', '_1'))
+ cur.close()
+ con.close()
+
+ def test_dict_cursor(self):
+ con = self._connect()
+ cur = con.dict_cursor()
+ self.assertIsInstance(cur, pgdb.DictCursor)
+ cur.execute("select 1 as abc, 2 as de, 3 as f")
+ res = cur.fetchone()
+ cur.close()
+ con.close()
+ self.assertIsInstance(res, dict)
+ self.assertEqual(res, {'abc': 1, 'de': 2, 'f': 3})
+ self.assertRaises(TypeError, res.popitem, last=True)
+
+ def test_ordered_dict_cursor(self):
+ con = self._connect()
+ cur = con.ordered_dict_cursor()
+ self.assertIsInstance(cur, pgdb.OrderedDictCursor)
+ cur.execute("select 1 as abc, 2 as de, 3 as f")
+ try:
+ res = cur.fetchone()
+ except pgdb.NotSupportedError:
+ if OrderedDict is None:
+ return
+ self.fail('OrderedDict supported by Python, but not by pgdb')
+ finally:
+ cur.close()
+ con.close()
+ self.assertIsInstance(res, dict)
+ self.assertEqual(res, {'abc': 1, 'de': 2, 'f': 3})
+ self.assertEqual(res.popitem(last=True), ('f', 3))
+
def test_row_factory(self):
class DictCursor(pgdb.Cursor):
def row_factory(self, row):
- return {desc[0]:value
- for desc, value in zip(self.description, row)}
+ # not using dict comprehension to stay compatible with Py 2.6
+ return dict(('column %s' % desc[0], value)
+ for desc, value in zip(self.description, row))
con = self._connect()
cur = DictCursor(con)
ret = cur.execute("select 1 as a, 2 as b")
self.assertTrue(ret is cur, 'execute() should return cursor')
- self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
+ self.assertEqual(cur.fetchone(), {'column a': 1, 'column b': 2})
+
+ def test_colnames(self):
+ con = self._connect()
+ cur = con.cursor()
+ cur.execute("select 1, 2, 3")
+ names = cur.colnames
+ self.assertIsInstance(names, list)
+ self.assertEqual(names, ['?column?', '?column?', '?column?'])
+ cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
+ names = cur.colnames
+ self.assertIsInstance(names, list)
+ self.assertEqual(names, ['a', 'bc', 'def', 'g'])
+
+ def test_coltypes(self):
+ con = self._connect()
+ cur = con.cursor()
+ cur.execute("select 1::int2, 2::int4, 3::int8")
+ types = cur.coltypes
+ self.assertIsInstance(types, list)
+ self.assertEqual(types, ['int2', 'int4', 'int8'])
def test_description_named(self):
con = self._connect()
@@ -101,10 +252,10 @@
def test_fetch_2_rows(self):
Decimal = pgdb.decimal_type()
- values = ['test', pgdb.Binary(b'\xff\x52\xb2'),
+ values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
'2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
- 7897234]
+ 7897234)
table = self.table_prefix + 'booze'
con = self._connect()
try:
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql