Author: cito
Date: Wed Jan 20 13:19:45 2016
New Revision: 770

Log:
Add methods for getting a table as a list or dict

Also added documentation and 100% test coverage.

The get_attnames() method now always returns a read-only ordered dictionary,
even under Python 2.6 or 3.0.  So you can sure the columns will be returned
in the right order if you iterate over it, and that you don't accidentally
modify the dictionary (since it is cached).

Modified:
   trunk/docs/contents/changelog.rst
   trunk/docs/contents/pg/db_wrapper.rst
   trunk/pg.py
   trunk/tests/__init__.py
   trunk/tests/test_classic_connection.py
   trunk/tests/test_classic_dbwrapper.py
   trunk/tests/test_classic_functions.py
   trunk/tests/test_classic_largeobj.py
   trunk/tests/test_dbapi20_copy.py

Modified: trunk/docs/contents/changelog.rst
==============================================================================
--- trunk/docs/contents/changelog.rst   Tue Jan 19 12:00:26 2016        (r769)
+++ trunk/docs/contents/changelog.rst   Wed Jan 20 13:19:45 2016        (r770)
@@ -42,6 +42,9 @@
   and use less memory. Also, overhead for quoting and escaping values in the
   DB wrapper methods has been reduced and security has been improved by
   passing the values to libpq separately as parameters instead of inline.
+- The classic interface got two new methods get_as_list() and get_as_dict()
+  returning a database table as a Python list or dict. The amount of data
+  returned can be controlled with various parameters.
 
 Version 4.2
 -----------

Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst       Tue Jan 19 12:00:26 2016        
(r769)
+++ trunk/docs/contents/pg/db_wrapper.rst       Wed Jan 20 13:19:45 2016        
(r770)
@@ -119,15 +119,13 @@
     Get the attribute names of a table
 
     :param str table: name of table
-    :returns: a dictionary mapping attribute names to type names
+    :returns: an ordered dictionary mapping attribute names to type names
 
 Given the name of a table, digs out the set of attribute names.
 
-Returns a dictionary of attribute names (the names are the keys,
-the values are the names of the attributes' types).
-
-If your Python version supports this, the dictionary will be an
-OrderedDictionary with the column names in the right order.
+Returns a read-only dictionary of attribute names (the names are the keys,
+the values are the names of the attributes' types) with the column names
+in the proper order if you iterate over it.
 
 By default, only a limited number of simple types will be returned.
 You can get the regular types after enabling this by calling the
@@ -161,7 +159,7 @@
     :returns: the current value(s) of the run-time parameter(s)
     :rtype: str, list or dict
     :raises TypeError: Invalid parameter type(s)
-    :raises ProgrammingError: Invalid parameter name(s)
+    :raises pg.ProgrammingError: Invalid parameter name(s)
 
 If the parameter is a string, the return value will also be a string
 that is the current setting of the run-time parameter with that name.
@@ -179,7 +177,7 @@
 
 .. versionadded:: 4.2
 
-.. method:: DB.set_parameter(self, parameter, [value], [local])
+.. method:: DB.set_parameter(parameter, [value], [local])
 
     Set the value of run-time parameters
 
@@ -189,7 +187,7 @@
     :type param: str or None
     :raises TypeError: Invalid parameter type(s)
     :raises ValueError: Invalid value argument(s)
-    :raises ProgrammingError: Invalid parameter name(s) or values
+    :raises pg.ProgrammingError: Invalid parameter name(s) or values
 
 If the parameter and the value are strings, the run-time parameter
 will be set to that value.  If no value or *None* is passed as a value,
@@ -294,7 +292,7 @@
     :param str keyname: name of field to use as key (optional)
     :returns: A dictionary - the keys are the attribute names,
       the values are the row values.
-    :raises ProgrammingError: table has no primary key or missing privilege
+    :raises pg.ProgrammingError: table has no primary key or missing privilege
     :raises KeyError: missing key value for the row
 
 This method is the basic mechanism to get a single row.  It assumes
@@ -324,7 +322,7 @@
     :param col: optional keyword arguments for updating the dictionary
     :returns: the inserted values in the database
     :rtype: dict
-    :raises ProgrammingError: missing privilege or conflict
+    :raises pg.ProgrammingError: missing privilege or conflict
 
 This method inserts a row into a table.  If the optional dictionary is
 not supplied then the required values must be included as keyword/value
@@ -346,7 +344,7 @@
     :param col: optional keyword arguments for updating the dictionary
     :returns: the new row in the database
     :rtype: dict
-    :raises ProgrammingError: table has no primary key or missing privilege
+    :raises pg.ProgrammingError: table has no primary key or missing privilege
     :raises KeyError: missing key value for the row
 
 Similar to insert but updates an existing row.  The update is based on
@@ -373,7 +371,7 @@
     :param col: optional keyword arguments for specifying the update
     :returns: the new row in the database
     :rtype: dict
-    :raises ProgrammingError: table has no primary key or missing privilege
+    :raises pg.ProgrammingError: table has no primary key or missing privilege
 
 This method inserts a row into a table, but instead of raising a
 ProgrammingError exception in case a row with the same primary key already
@@ -479,7 +477,7 @@
     :param dict d: optional dictionary of values
     :param col: optional keyword arguments for updating the dictionary
     :rtype: None
-    :raises ProgrammingError: table has no primary key,
+    :raises pg.ProgrammingError: table has no primary key,
         row is still referenced or missing privilege
     :raises KeyError: missing key value for the row
 
@@ -493,10 +491,10 @@
 Note that if the row cannot be deleted because e.g. it is still referenced
 by another table, this method will raise a ProgrammingError.
 
-truncate -- Quickly empty database tables
+truncate -- quickly empty database tables
 -----------------------------------------
 
-.. method:: DB.truncate(self, table, [restart], [cascade], [only]):
+.. method:: DB.truncate(table, [restart], [cascade], [only])
 
     Empty a table or set of tables
 
@@ -524,6 +522,83 @@
 
 .. versionadded:: 4.2
 
+get_as_list/dict -- read a table as a list or dictionary
+--------------------------------------------------------
+
+.. method:: DB.get_as_list(table, [what], [where], [order], [limit], [offset], 
[scalar])
+
+    Get a table as a list
+
+    :param str table: the name of the table (the FROM clause)
+    :param what: column(s) to be returned (the SELECT clause)
+    :type what: str, list, tuple or None
+    :param where: conditions(s) to be fulfilled (the WHERE clause)
+    :type where: str, list, tuple or None
+    :param order: column(s) to sort by (the ORDER BY clause)
+    :type order: str, list, tuple, False or None
+    :param int limit: maximum number of rows returned (the LIMIT clause)
+    :param int offset: number of rows to be skipped (the OFFSET clause)
+    :param bool scalar: whether only the first column shall be returned
+    :returns: the content of the table as a list
+    :rtype: list
+    :raises TypeError: the table name has not been specified
+
+This gets a convenient representation of the table as a list of named tuples
+in Python.  You only need to pass the name of the table (or any other SQL
+expression returning rows).  Note that by default this will return the full
+content of the table which can be huge and overflow your memory.  However, you
+can control the amount of data returned using the other optional parameters.
+
+The parameter *what* can restrict the query to only return a subset of the
+table columns.  The parameter *where* can restrict the query to only return a
+subset of the table rows.  The specified SQL expressions all need to be
+fulfilled for a row to get into the result.  The parameter *order* specifies
+the ordering of the rows.  If no ordering is specified, the result will be
+ordered by the primary key(s) or all columns if no primary key exists.
+You can set *order* to *False* if you don't care about the ordering.
+The parameters *limit* and *offset* specify the maximum number of rows
+returned and a number of rows skipped over.
+
+If you set the *scalar* option to *True*, then instead of the named tuples
+you will get the first items of these tuples.  This is useful if the result
+has only one column anyway.
+
+.. method:: DB.get_as_dict(table, [keyname], [what], [where], [order], 
[limit], [offset], [scalar])
+
+    Get a table as a dictionary
+
+    :param str table: the name of the table (the FROM clause)
+    :param keyname: column(s) to be used as key(s) of the dictionary
+    :type keyname: str, list, tuple or None
+    :param what: column(s) to be returned (the SELECT clause)
+    :type what: str, list, tuple or None
+    :param where: conditions(s) to be fulfilled (the WHERE clause)
+    :type where: str, list, tuple or None
+    :param order: column(s) to sort by (the ORDER BY clause)
+    :type order: str, list, tuple, False or None
+    :param int limit: maximum number of rows returned (the LIMIT clause)
+    :param int offset: number of rows to be skipped (the OFFSET clause)
+    :param bool scalar: whether only the first column shall be returned
+    :returns: the content of the table as a list
+    :rtype: dict or OrderedDict
+    :raises TypeError: the table name has not been specified
+    :raises KeyError: keyname(s) are invalid or not part of the result
+    :raises pg.ProgrammingError: no keyname(s) and table has no primary key
+
+This method is similar to :meth:`DB.get_as_list`, but returns the table as
+a Python dict instead of a Python list, which can be even more convenient.
+The primary key column(s) of the table will be used as the keys of the
+dictionary, while the other column(s) will be the corresponding values.
+The keys will be named tuples if the table has a composite primary key.
+The rows will be also named tuples unless the *scalar* option has been set
+to *True*.  With the optional parameter *keyname* you can specify a different
+set of columns to be used as the keys of the dictionary.
+
+If the Python version supports it, the dictionary will be an *OrderedDict*
+using the order specified with the *order* parameter or the key column(s)
+if not specified.  You can set *order* to *False* if you don't care about the
+ordering.  In this case the returned dictionary will be an ordinary one.
+
 escape_literal -- escape a literal string for use within SQL
 ------------------------------------------------------------
 

Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Tue Jan 19 12:00:26 2016        (r769)
+++ trunk/pg.py Wed Jan 20 13:19:45 2016        (r770)
@@ -37,11 +37,7 @@
 from decimal import Decimal
 from collections import namedtuple
 from functools import partial
-
-try:
-    from collections import OrderedDict
-except ImportError:  # Python 2.6 or 3.0
-    OrderedDict = dict
+from operator import itemgetter
 
 try:
     basestring
@@ -50,6 +46,92 @@
 
 set_decimal(Decimal)
 
+try:
+    from collections import OrderedDict
+except ImportError:  # Python 2.6 or 3.0
+    OrderedDict = dict
+
+
+    class AttrDict(dict):
+        """Simple read-only ordered dictionary for storing attribute names."""
+
+        def __init__(self, *args, **kw):
+            if len(args) > 1 or kw:
+                raise TypeError
+            items = args[0] if args else []
+            if isinstance(items, dict):
+                raise TypeError
+            items = list(items)
+            self._keys = [item[0] for item in items]
+            dict.__init__(self, items)
+            self._read_only = True
+            error = self._read_only_error
+            self.clear = self.update = error
+            self.pop = self.setdefault = self.popitem = error
+
+        def __setitem__(self, key, value):
+            if self._read_only:
+                self._read_only_error()
+            dict.__setitem__(self, key, value)
+
+        def __delitem__(self, key):
+            if self._read_only:
+                self._read_only_error()
+            dict.__delitem__(self, key)
+
+        def __iter__(self):
+            return iter(self._keys)
+
+        def keys(self):
+            return list(self._keys)
+
+        def values(self):
+            return [self[key] for key in self]
+
+        def items(self):
+            return [(key, self[key]) for key in self]
+
+        def iterkeys(self):
+            return self.__iter__()
+
+        def itervalues(self):
+            return iter(self.values())
+
+        def iteritems(self):
+            return iter(self.items())
+
+        @staticmethod
+        def _read_only_error(*args, **kw):
+            raise TypeError('This object is read-only')
+
+else:
+
+     class AttrDict(OrderedDict):
+        """Simple read-only ordered dictionary for storing attribute names."""
+
+        def __init__(self, *args, **kw):
+            self._read_only = False
+            OrderedDict.__init__(self, *args, **kw)
+            self._read_only = True
+            error = self._read_only_error
+            self.clear = self.update = error
+            self.pop = self.setdefault = self.popitem = error
+
+        def __setitem__(self, key, value):
+            if self._read_only:
+                self._read_only_error()
+            OrderedDict.__setitem__(self, key, value)
+
+        def __delitem__(self, key):
+            if self._read_only:
+                self._read_only_error()
+            OrderedDict.__delitem__(self, key)
+
+        @staticmethod
+        def _read_only_error(*args, **kw):
+            raise TypeError('This object is read-only')
+
+
 
 # Auxiliary functions that are independent from a DB connection:
 
@@ -85,6 +167,23 @@
 set_namedresult(_namedresult)
 
 
+class _MemoryQuery:
+    """Class that embodies a given query result."""
+
+    def __init__(self, result, fields):
+        """Create query from given result rows and field names."""
+        self.result = result
+        self.fields = fields
+
+    def listfields(self):
+        """Return the stored field names of this query."""
+        return self.fields
+
+    def getresult(self):
+        """Return the stored result of this query."""
+        return self.result
+
+
 def _db_error(msg, cls=DatabaseError):
     """Return DatabaseError with empty sqlstate attribute."""
     error = cls(msg)
@@ -703,13 +802,11 @@
     def get_attnames(self, table, flush=False):
         """Given the name of a table, dig out the set of attribute names.
 
-        Returns a dictionary of attribute names (the names are the keys,
-        the values are the names of the attributes' types).
-
-        If your Python version supports this, the dictionary will be an
-        OrderedDictionary with the column names in the right order.
+        Returns a read-only dictionary of attribute names (the names are
+        the keys, the values are the names of the attributes' types)
+        with the column names in the proper order if you iterate over it.
 
-        If flush is set then the internal cache for attribute names will
+        If flush is set, then the internal cache for attribute names will
         be flushed. This may be necessary after the database schema or
         the search path has been changed.
 
@@ -734,7 +831,7 @@
             names = self.db.query(q, (table,)).getresult()
             if not self._regtypes:
                 names = ((name, _simpletype(typ)) for name, typ in names)
-            names = OrderedDict(names)
+            names = AttrDict(names)
             attnames[table] = names  # cache it
         return names
 
@@ -1191,7 +1288,165 @@
             q.append('CASCADE')
         q = ' '.join(q)
         self._do_debug(q)
-        return self.query(q)
+        return self.db.query(q)
+
+    def get_as_list(self, table, what=None, where=None,
+            order=None, limit=None, offset=None, scalar=False):
+        """Get a table as a list.
+
+        This gets a convenient representation of the table as a list
+        of named tuples in Python.  You only need to pass the name of
+        the table (or any other SQL expression returning rows).  Note that
+        by default this will return the full content of the table which
+        can be huge and overflow your memory.  However, you can control
+        the amount of data returned using the other optional parameters.
+
+        The parameter 'what' can restrict the query to only return a
+        subset of the table columns.  It can be a string, list or a tuple.
+        The parameter 'where' can restrict the query to only return a
+        subset of the table rows.  It can be a string, list or a tuple
+        of SQL expressions that all need to be fulfilled.  The parameter
+        'order' specifies the ordering of the rows.  It can also be a
+        other string, list or a tuple.  If no ordering is specified,
+        the result will be ordered by the primary key(s) or all columns
+        if no primary key exists.  You can set 'order' to False if you
+        don't care about the ordering.  The parameters 'limit' and 'offset'
+        can be integers specifying the maximum number of rows returned
+        and a number of rows skipped over.
+
+        If you set the 'scalar' option to True, then instead of the
+        named tuples you will get the first items of these tuples.
+        This is useful if the result has only one column anyway.
+        """
+        if not table:
+            raise TypeError('The table name is missing')
+        if what:
+            if isinstance(what, (list, tuple)):
+                what = ', '.join(map(str, what))
+            if order is None:
+                order = what
+        else:
+            what = '*'
+        q = ['SELECT', what, 'FROM', table]
+        if where:
+            if isinstance(where, (list, tuple)):
+                where = ' AND '.join(map(str, where))
+            q.extend(['WHERE', where])
+        if order is None:
+            try:
+                order = self.pkey(table, True)
+            except (KeyError, ProgrammingError):
+                try:
+                    order = list(self.get_attnames(table))
+                except (KeyError, ProgrammingError):
+                    pass
+        if order:
+            if isinstance(order, (list, tuple)):
+                order = ', '.join(map(str, order))
+            q.extend(['ORDER BY', order])
+        if limit:
+            q.append('LIMIT %d' % limit)
+        if offset:
+            q.append('OFFSET %d' % offset)
+        q = ' '.join(q)
+        self._do_debug(q)
+        q = self.db.query(q)
+        res = q.namedresult()
+        if res and scalar:
+            res = [row[0] for row in res]
+        return res
+
+    def get_as_dict(self, table, keyname=None, what=None, where=None,
+            order=None, limit=None, offset=None, scalar=False):
+        """Get a table as a dictionary.
+
+        This method is similar to get_as_list(), but returns the table
+        as a Python dict instead of a Python list, which can be even
+        more convenient. The primary key column(s) of the table will
+        be used as the keys of the dictionary, while the other column(s)
+        will be the corresponding values.  The keys will be named tuples
+        if the table has a composite primary key.  The rows will be also
+        named tuples unless the 'scalar' option has been set to True.
+        With the optional parameter 'keyname' you can specify an alternative
+        set of columns to be used as the keys of the dictionary.  It must
+        be set as a string, list or a tuple.
+
+        If the Python version supports it, the dictionary will be an
+        OrderedDict using the order specified with the 'order' parameter
+        or the key column(s) if not specified.  You can set 'order' to False
+        if you don't care about the ordering.  In this case the returned
+        dictionary will be an ordinary one.
+        """
+        if not table:
+            raise TypeError('The table name is missing')
+        if not keyname:
+            try:
+                keyname = self.pkey(table, True)
+            except (KeyError, ProgrammingError):
+                raise _prg_error('Table %s has no primary key' % table)
+        if isinstance(keyname, basestring):
+            keyname = [keyname]
+        elif not isinstance(keyname, (list, tuple)):
+            raise KeyError('The keyname must be a string, list or tuple')
+        if what:
+            if isinstance(what, (list, tuple)):
+                what = ', '.join(map(str, what))
+            if order is None:
+                order = what
+        else:
+            what = '*'
+        q = ['SELECT', what, 'FROM', table]
+        if where:
+            if isinstance(where, (list, tuple)):
+                where = ' AND '.join(map(str, where))
+            q.extend(['WHERE', where])
+        if order is None:
+            order = keyname
+        if order:
+            if isinstance(order, (list, tuple)):
+                order = ', '.join(map(str, order))
+            q.extend(['ORDER BY', order])
+        if limit:
+            q.append('LIMIT %d' % limit)
+        if offset:
+            q.append('OFFSET %d' % offset)
+        q = ' '.join(q)
+        self._do_debug(q)
+        q = self.db.query(q)
+        res = q.getresult()
+        cls = OrderedDict if order else dict
+        if not res:
+            return cls()
+        keyset = set(keyname)
+        fields = q.listfields()
+        if not keyset.issubset(fields):
+            raise KeyError('Missing keyname in row')
+        keyind, rowind = [], []
+        for i, f in enumerate(fields):
+            (keyind if f in keyset else rowind).append(i)
+        keytuple = len(keyind) > 1
+        getkey = itemgetter(*keyind)
+        keys = map(getkey, res)
+        if scalar:
+            rowind = rowind[:1]
+            rowtuple = False
+        else:
+            rowtuple = len(rowind) > 1
+        if scalar or rowtuple:
+            getrow = itemgetter(*rowind)
+        else:
+            rowind = rowind[0]
+            getrow = lambda row: (row[rowind],)
+            rowtuple = True
+        rows = map(getrow, res)
+        if keytuple or rowtuple:
+            namedresult = get_namedresult()
+            if keytuple:
+                keys = namedresult(_MemoryQuery(keys, keyname))
+            if rowtuple:
+                fields = [f for f in fields if f not in keyset]
+                rows = namedresult(_MemoryQuery(rows, fields))
+        return cls(zip(keys, rows))
 
     def notification_handler(self,
             event, callback, arg_dict=None, timeout=None, stop_event=None):

Modified: trunk/tests/__init__.py
==============================================================================
--- trunk/tests/__init__.py     Tue Jan 19 12:00:26 2016        (r769)
+++ trunk/tests/__init__.py     Wed Jan 20 13:19:45 2016        (r770)
@@ -1,7 +1,6 @@
 """PyGreSQL test suite.
 
 You can specify your local database settings in LOCAL_PyGreSQL.py.
-
 """
 
 try:
@@ -9,6 +8,12 @@
 except ImportError:
     import unittest
 
+if not (hasattr(unittest, 'skip')
+        and hasattr(unittest.TestCase, 'setUpClass')
+        and hasattr(unittest.TestCase, 'skipTest')
+        and hasattr(unittest.TestCase, 'assertIn')):
+    raise ImportError('Please install a newer version of unittest')
+
 
 def discover():
     loader = unittest.TestLoader()

Modified: trunk/tests/test_classic_connection.py
==============================================================================
--- trunk/tests/test_classic_connection.py      Tue Jan 19 12:00:26 2016        
(r769)
+++ trunk/tests/test_classic_connection.py      Wed Jan 20 13:19:45 2016        
(r770)
@@ -8,7 +8,6 @@
 Contributed by Christoph Zwerschke.
 
 These tests need a database to test against.
-
 """
 
 try:
@@ -870,6 +869,8 @@
 class TestInserttable(unittest.TestCase):
     """Test inserttable method."""
 
+    cls_set_up = False
+
     @classmethod
     def setUpClass(cls):
         c = connect()
@@ -884,6 +885,7 @@
         cls.has_encoding = c.query(
             "select length('รค') - length('a')").getresult()[0][0] == 0
         c.close()
+        cls.cls_set_up = True
 
     @classmethod
     def tearDownClass(cls):
@@ -892,6 +894,7 @@
         c.close()
 
     def setUp(self):
+        self.assertTrue(self.cls_set_up)
         self.c = connect()
         self.c.query("set client_encoding=utf8")
         self.c.query("set datestyle='ISO,YMD'")
@@ -1101,12 +1104,15 @@
 class TestDirectSocketAccess(unittest.TestCase):
     """Test copy command with direct socket access."""
 
+    cls_set_up = False
+
     @classmethod
     def setUpClass(cls):
         c = connect()
         c.query("drop table if exists test cascade")
         c.query("create table test (i int, v varchar(16))")
         c.close()
+        cls.cls_set_up = True
 
     @classmethod
     def tearDownClass(cls):
@@ -1115,6 +1121,7 @@
         c.close()
 
     def setUp(self):
+        self.assertTrue(self.cls_set_up)
         self.c = connect()
         self.c.query("set client_encoding=utf8")
 
@@ -1282,7 +1289,6 @@
 
     To test the effect of most of these functions, we need a database
     connection.  That's why they are covered in this test module.
-
     """
 
     def setUp(self):
@@ -1605,17 +1611,20 @@
     we need to open a connection with fixed parameters prior to testing
     in order to ensure that the tests always run under the same conditions.
     That's why these tests are included in this test module.
-
     """
 
+    cls_set_up = False
+
     @classmethod
     def setUpClass(cls):
         query = connect().query
         query('set client_encoding=sql_ascii')
         query('set standard_conforming_strings=off')
         query('set bytea_output=escape')
+        cls.cls_set_up = True
 
     def testEscapeString(self):
+        self.assertTrue(self.cls_set_up)
         f = pg.escape_string
         r = f(b'plain')
         self.assertIsInstance(r, bytes)
@@ -1633,6 +1642,7 @@
         self.assertEqual(r, r"It''s bad to have a \\ inside.")
 
     def testEscapeBytea(self):
+        self.assertTrue(self.cls_set_up)
         f = pg.escape_bytea
         r = f(b'plain')
         self.assertIsInstance(r, bytes)

Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py       Tue Jan 19 12:00:26 2016        
(r769)
+++ trunk/tests/test_classic_dbwrapper.py       Wed Jan 20 13:19:45 2016        
(r770)
@@ -8,8 +8,8 @@
 Contributed by Christoph Zwerschke.
 
 These tests need a database to test against.
-
 """
+
 try:
     import unittest2 as unittest  # for Python < 2.7
 except ImportError:
@@ -22,6 +22,7 @@
 import pg  # the module under test
 
 from decimal import Decimal
+from operator import itemgetter
 
 # We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
 # get our information from that.  Otherwise we use the defaults.
@@ -77,6 +78,89 @@
     return db
 
 
+class TestAttrDict(unittest.TestCase):
+    """Test the simple ordered dictionary for attribute names."""
+
+    cls = pg.AttrDict
+    base = OrderedDict
+
+    def testInit(self):
+        a = self.cls()
+        self.assertIsInstance(a, self.base)
+        self.assertEqual(a, self.base())
+        items = [('id', 'int'), ('name', 'text')]
+        a = self.cls(items)
+        self.assertIsInstance(a, self.base)
+        self.assertEqual(a, self.base(items))
+        iteritems = iter(items)
+        a = self.cls(iteritems)
+        self.assertIsInstance(a, self.base)
+        self.assertEqual(a, self.base(items))
+
+    def testIter(self):
+        a = self.cls()
+        self.assertEqual(list(a), [])
+        keys = ['id', 'name', 'age']
+        items = [(key, None) for key in keys]
+        a = self.cls(items)
+        self.assertEqual(list(a), keys)
+
+    def testKeys(self):
+        a = self.cls()
+        self.assertEqual(list(a.keys()), [])
+        keys = ['id', 'name', 'age']
+        items = [(key, None) for key in keys]
+        a = self.cls(items)
+        self.assertEqual(list(a.keys()), keys)
+
+    def testValues(self):
+        a = self.cls()
+        self.assertEqual(list(a.values()), [])
+        items = [('id', 'int'), ('name', 'text')]
+        values = [item[1] for item in items]
+        a = self.cls(items)
+        self.assertEqual(list(a.values()), values)
+
+    def testItems(self):
+        a = self.cls()
+        self.assertEqual(list(a.items()), [])
+        items = [('id', 'int'), ('name', 'text')]
+        a = self.cls(items)
+        self.assertEqual(list(a.items()), items)
+
+    def testGet(self):
+        a = self.cls([('id', 1)])
+        try:
+            self.assertEqual(a['id'], 1)
+        except KeyError:
+            self.fail('AttrDict should be readable')
+
+    def testSet(self):
+        a = self.cls()
+        try:
+            a['id'] = 1
+        except TypeError:
+            pass
+        else:
+            self.fail('AttrDict should be read-only')
+
+    def testDel(self):
+        a = self.cls([('id', 1)])
+        try:
+            del a['id']
+        except TypeError:
+            pass
+        else:
+            self.fail('AttrDict should be read-only')
+
+    def testWriteMethods(self):
+        a = self.cls([('id', 1)])
+        self.assertEqual(a['id'], 1)
+        for method in 'clear', 'update', 'pop', 'setdefault', 'popitem':
+            method = getattr(a, method)
+            self.assertRaises(TypeError, method, a)
+
+
 class TestDBClassBasic(unittest.TestCase):
     """Test existence of the DB class wrapped pg connection methods."""
 
@@ -99,7 +183,8 @@
             'escape_bytea', 'escape_identifier',
             'escape_literal', 'escape_string',
             'fileno',
-            'get', 'get_attnames', 'get_databases',
+            'get', 'get_as_dict', 'get_as_list',
+            'get_attnames', 'get_databases',
             'get_notice_receiver', 'get_parameter',
             'get_relations', 'get_tables',
             'getline', 'getlo', 'getnotify',
@@ -283,6 +368,8 @@
 class TestDBClass(unittest.TestCase):
     """Test the methods of the DB class wrapped pg connection."""
 
+    cls_set_up = False
+
     @classmethod
     def setUpClass(cls):
         db = DB()
@@ -294,6 +381,7 @@
         db.query("create or replace view test_view as"
             " select i4, v4 from test")
         db.close()
+        cls.cls_set_up = True
 
     @classmethod
     def tearDownClass(cls):
@@ -302,6 +390,7 @@
         db.close()
 
     def setUp(self):
+        self.assertTrue(self.cls_set_up)
         self.db = DB()
         query = self.db.query
         query('set client_encoding=utf8')
@@ -937,17 +1026,50 @@
 
     def testGetAttnamesIsOrdered(self):
         get_attnames = self.db.get_attnames
-        query = self.db.query
-        self.createTable('test_table',
+        r = get_attnames('test', flush=True)
+        self.assertIsInstance(r, OrderedDict)
+        self.assertEqual(r, OrderedDict([
+            ('i2', 'int'), ('i4', 'int'), ('i8', 'int'),
+            ('d', 'num'), ('f4', 'float'), ('f8', 'float'), ('m', 'money'),
+            ('v4', 'text'), ('c4', 'text'), ('t', 'text')]))
+        if OrderedDict is not dict:
+            r = ' '.join(list(r.keys()))
+            self.assertEqual(r, 'i2 i4 i8 d f4 f8 m v4 c4 t')
+        table = 'test table for get_attnames'
+        self.createTable(table,
             ' n int, alpha smallint, v varchar(3),'
             ' gamma char(5), tau text, beta bool')
-        r = get_attnames("test_table")
+        r = get_attnames(table)
         self.assertIsInstance(r, OrderedDict)
         self.assertEqual(r, OrderedDict([
             ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
             ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
-        if OrderedDict is dict:
+        if OrderedDict is not dict:
+            r = ' '.join(list(r.keys()))
+            self.assertEqual(r, 'n alpha v gamma tau beta')
+        else:
             self.skipTest('OrderedDict is not supported')
+
+    def testGetAttnamesIsAttrDict(self):
+        AttrDict = pg.AttrDict
+        get_attnames = self.db.get_attnames
+        r = get_attnames('test', flush=True)
+        self.assertIsInstance(r, AttrDict)
+        self.assertEqual(r, AttrDict([
+            ('i2', 'int'), ('i4', 'int'), ('i8', 'int'),
+            ('d', 'num'), ('f4', 'float'), ('f8', 'float'), ('m', 'money'),
+            ('v4', 'text'), ('c4', 'text'), ('t', 'text')]))
+        r = ' '.join(list(r.keys()))
+        self.assertEqual(r, 'i2 i4 i8 d f4 f8 m v4 c4 t')
+        table = 'test table for get_attnames'
+        self.createTable(table,
+            ' n int, alpha smallint, v varchar(3),'
+            ' gamma char(5), tau text, beta bool')
+        r = get_attnames(table)
+        self.assertIsInstance(r, AttrDict)
+        self.assertEqual(r, AttrDict([
+            ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
+            ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
         r = ' '.join(list(r.keys()))
         self.assertEqual(r, 'n alpha v gamma tau beta')
 
@@ -1828,7 +1950,6 @@
 
     def testClearWithQuotedNames(self):
         clear = self.db.clear
-        query = self.db.query
         table = 'test table for clear()'
         self.createTable(table, '"Prime!" smallint primary key,'
             ' "much space" integer, "Questions?" text')
@@ -2221,6 +2342,286 @@
         r = query(q).getresult()[0][0]
         self.assertEqual(r, 0)
 
+    def testGetAsList(self):
+        get_as_list = self.db.get_as_list
+        self.assertRaises(TypeError, get_as_list)
+        self.assertRaises(TypeError, get_as_list, None)
+        query = self.db.query
+        table = 'test_aslist'
+        r = query('select 1 as colname').namedresult()[0]
+        self.assertIsInstance(r, tuple)
+        named = hasattr(r, 'colname')
+        names = [(1, 'Homer'), (2, 'Marge'),
+                (3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
+        self.createTable(table,
+            'id smallint primary key, name varchar', values=names)
+        r = get_as_list(table)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names)
+        for t, n in zip(r, names):
+            self.assertIsInstance(t, tuple)
+            self.assertEqual(t, n)
+            if named:
+                self.assertEqual(t.id, n[0])
+                self.assertEqual(t.name, n[1])
+                self.assertEqual(t._asdict(), dict(id=n[0], name=n[1]))
+        r = get_as_list(table, what='name')
+        self.assertIsInstance(r, list)
+        expected = sorted((row[1],) for row in names)
+        self.assertEqual(r, expected)
+        r = get_as_list(table, what='name, id')
+        self.assertIsInstance(r, list)
+        expected = sorted(tuple(reversed(row)) for row in names)
+        self.assertEqual(r, expected)
+        r = get_as_list(table, what=['name', 'id'])
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, expected)
+        r = get_as_list(table, where="name like 'Ba%'")
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names[2:3])
+        r = get_as_list(table, what='name', where="name like 'Ma%'")
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, [('Maggie',), ('Marge',)])
+        r = get_as_list(table, what='name',
+                        where=["name like 'Ma%'", "name like '%r%'"])
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, [('Marge',)])
+        r = get_as_list(table, what='name', order='id')
+        self.assertIsInstance(r, list)
+        expected = [(row[1],) for row in names]
+        self.assertEqual(r, expected)
+        r = get_as_list(table, what=['name'], order=['id'])
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, expected)
+        r = get_as_list(table, what=['id', 'name'], order=['id', 'name'])
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names)
+        r = get_as_list(table, what='id * 2 as num', order='id desc')
+        self.assertIsInstance(r, list)
+        expected = [(n,) for n in range(10, 0, -2)]
+        self.assertEqual(r, expected)
+        r = get_as_list(table, limit=2)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names[:2])
+        r = get_as_list(table, offset=3)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names[3:])
+        r = get_as_list(table, limit=1, offset=2)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names[2:3])
+        r = get_as_list(table, scalar=True)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, list(range(1, 6)))
+        r = get_as_list(table, what='name', scalar=True)
+        self.assertIsInstance(r, list)
+        expected = sorted(row[1] for row in names)
+        self.assertEqual(r, expected)
+        r = get_as_list(table, what='name', limit=1, scalar=True)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, expected[:1])
+        query('alter table "%s" drop constraint "%s_pkey"' % (table, table))
+        self.assertRaises(KeyError, self.db.pkey, table, flush=True)
+        names.insert(1, (1, 'Snowball'))
+        query('insert into "%s" values ($1, $2)' % table, (1, 'Snowball'))
+        r = get_as_list(table)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, names)
+        r = get_as_list(table, what='name', where='id=1', scalar=True)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, ['Homer', 'Snowball'])
+        # test with unordered query
+        r = get_as_list(table, order=False)
+        self.assertIsInstance(r, list)
+        self.assertEqual(set(r), set(names))
+        # test with arbitrary from clause
+        from_table = '(select lower(name) as n2 from "%s") as t2' % table
+        r = get_as_list(from_table)
+        self.assertIsInstance(r, list)
+        r = set(row[0] for row in r)
+        expected = set(row[1].lower() for row in names)
+        self.assertEqual(r, expected)
+        r = get_as_list(from_table, order='n2', scalar=True)
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, sorted(expected))
+        r = get_as_list(from_table, order='n2', limit=1)
+        self.assertIsInstance(r, list)
+        self.assertEqual(len(r), 1)
+        t = r[0]
+        self.assertIsInstance(t, tuple)
+        if named:
+            self.assertEqual(t.n2, 'bart')
+            self.assertEqual(t._asdict(), dict(n2='bart'))
+        else:
+            self.assertEqual(t, ('bart',))
+
+    def testGetAsDict(self):
+        get_as_dict = self.db.get_as_dict
+        self.assertRaises(TypeError, get_as_dict)
+        self.assertRaises(TypeError, get_as_dict, None)
+        # the test table has no primary key
+        self.assertRaises(pg.ProgrammingError, get_as_dict, 'test')
+        query = self.db.query
+        table = 'test_asdict'
+        r = query('select 1 as colname').namedresult()[0]
+        self.assertIsInstance(r, tuple)
+        named = hasattr(r, 'colname')
+        colors = [(1, '#7cb9e8', 'Aero'), (2, '#b5a642', 'Brass'),
+                  (3, '#b2ffff', 'Celeste'), (4, '#c19a6b', 'Desert')]
+        self.createTable(table,
+            'id smallint primary key, rgb char(7), name varchar',
+            values=colors)
+        # keyname must be string, list or tuple
+        self.assertRaises(KeyError, get_as_dict, table, 3)
+        self.assertRaises(KeyError, get_as_dict, table, dict(id=None))
+        # missing keyname in row
+        self.assertRaises(KeyError, get_as_dict, table,
+                          keyname='rgb', what='name')
+        r = get_as_dict(table)
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[0], row[1:]) for row in colors)
+        self.assertEqual(r, expected)
+        for key in r:
+            self.assertIsInstance(key, int)
+            self.assertIn(key, expected)
+            row = r[key]
+            self.assertIsInstance(row, tuple)
+            t = expected[key]
+            self.assertEqual(row, t)
+            if named:
+                self.assertEqual(row.rgb, t[0])
+                self.assertEqual(row.name, t[1])
+                self.assertEqual(row._asdict(), dict(rgb=t[0], name=t[1]))
+        if OrderedDict is not dict:  # Python > 2.6
+            self.assertEqual(r.keys(), expected.keys())
+        r = get_as_dict(table, keyname='rgb')
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[1], (row[0], row[2]))
+            for row in sorted(colors, key=itemgetter(1)))
+        self.assertEqual(r, expected)
+        for key in r:
+            self.assertIsInstance(key, str)
+            self.assertIn(key, expected)
+            row = r[key]
+            self.assertIsInstance(row, tuple)
+            t = expected[key]
+            self.assertEqual(row, t)
+            if named:
+                self.assertEqual(row.id, t[0])
+                self.assertEqual(row.name, t[1])
+                self.assertEqual(row._asdict(), dict(id=t[0], name=t[1]))
+        if OrderedDict is not dict:  # Python > 2.6
+            self.assertEqual(r.keys(), expected.keys())
+        r = get_as_dict(table, keyname=['id', 'rgb'])
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[:2], row[2:]) for row in colors)
+        self.assertEqual(r, expected)
+        for key in r:
+            self.assertIsInstance(key, tuple)
+            self.assertIsInstance(key[0], int)
+            self.assertIsInstance(key[1], str)
+            if named:
+                self.assertEqual(key, (key.id, key.rgb))
+                self.assertEqual(key._fields, ('id', 'rgb'))
+            row = r[key]
+            self.assertIsInstance(row, tuple)
+            self.assertIsInstance(row[0], str)
+            t = expected[key]
+            self.assertEqual(row, t)
+            if named:
+                self.assertEqual(row.name, t[0])
+                self.assertEqual(row._asdict(), dict(name=t[0]))
+        if OrderedDict is not dict:  # Python > 2.6
+            self.assertEqual(r.keys(), expected.keys())
+        r = get_as_dict(table, keyname=['id', 'rgb'], scalar=True)
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[:2], row[2]) for row in colors)
+        self.assertEqual(r, expected)
+        for key in r:
+            self.assertIsInstance(key, tuple)
+            row = r[key]
+            self.assertIsInstance(row, str)
+            t = expected[key]
+            self.assertEqual(row, t)
+        if OrderedDict is not dict:  # Python > 2.6
+            self.assertEqual(r.keys(), expected.keys())
+        r = get_as_dict(table, keyname='rgb', what=['rgb', 'name'], 
scalar=True)
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[1], row[2])
+            for row in sorted(colors, key=itemgetter(1)))
+        self.assertEqual(r, expected)
+        for key in r:
+            self.assertIsInstance(key, str)
+            row = r[key]
+            self.assertIsInstance(row, str)
+            t = expected[key]
+            self.assertEqual(row, t)
+        if OrderedDict is not dict:  # Python > 2.6
+            self.assertEqual(r.keys(), expected.keys())
+        r = get_as_dict(table, what='id, name',
+                        where="rgb like '#b%'", scalar=True)
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[0], row[2]) for row in colors[1:3])
+        self.assertEqual(r, expected)
+        for key in r:
+            self.assertIsInstance(key, int)
+            row = r[key]
+            self.assertIsInstance(row, str)
+            t = expected[key]
+            self.assertEqual(row, t)
+        if OrderedDict is not dict:  # Python > 2.6
+            self.assertEqual(r.keys(), expected.keys())
+        expected = r
+        r = get_as_dict(table, what=['name', 'id'],
+                        where=['id > 1', 'id < 4', "rgb like '#b%'",
+                   "name not like 'A%'", "name not like '%t'"], scalar=True)
+        self.assertEqual(r, expected)
+        r = get_as_dict(table, what='name, id', limit=2, offset=1, scalar=True)
+        self.assertEqual(r, expected)
+        r = get_as_dict(table, keyname=('id',), what=('name', 'id'),
+                        where=('id > 1', 'id < 4'), order=('id',), scalar=True)
+        self.assertEqual(r, expected)
+        r = get_as_dict(table, limit=1)
+        self.assertEqual(len(r), 1)
+        self.assertEqual(r[1][1], 'Aero')
+        r = get_as_dict(table, offset=3)
+        self.assertEqual(len(r), 1)
+        self.assertEqual(r[4][1], 'Desert')
+        r = get_as_dict(table, order='id desc')
+        expected = OrderedDict((row[0], row[1:]) for row in reversed(colors))
+        self.assertEqual(r, expected)
+        r = get_as_dict(table, where='id > 5')
+        self.assertIsInstance(r, OrderedDict)
+        self.assertEqual(len(r), 0)
+        # test with unordered query
+        expected = dict((row[0], row[1:]) for row in colors)
+        r = get_as_dict(table, order=False)
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r, expected)
+        if dict is not OrderedDict:  # Python > 2.6
+            self.assertNotIsInstance(self, OrderedDict)
+        # test with arbitrary from clause
+        from_table = '(select id, lower(name) as n2 from "%s") as t2' % table
+        # primary key must be passed explicitly in this case
+        self.assertRaises(pg.ProgrammingError, get_as_dict, from_table)
+        r = get_as_dict(from_table, 'id')
+        self.assertIsInstance(r, OrderedDict)
+        expected = OrderedDict((row[0], (row[2].lower(),)) for row in colors)
+        self.assertEqual(r, expected)
+        # test without a primary key
+        query('alter table "%s" drop constraint "%s_pkey"' % (table, table))
+        self.assertRaises(KeyError, self.db.pkey, table, flush=True)
+        self.assertRaises(pg.ProgrammingError, get_as_dict, table)
+        r = get_as_dict(table, keyname='id')
+        expected = OrderedDict((row[0], row[1:]) for row in colors)
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r, expected)
+        r = (1, '#007fff', 'Azure')
+        query('insert into "%s" values ($1, $2, $3)' % table, r)
+        # the last entry will win
+        expected[1] = r[1:]
+        r = get_as_dict(table, keyname='id')
+        self.assertEqual(r, expected)
+
     def testTransaction(self):
         query = self.db.query
         self.createTable('test_table', 'n integer', temporary=False)
@@ -2505,6 +2906,8 @@
 class TestSchemas(unittest.TestCase):
     """Test correct handling of schemas (namespaces)."""
 
+    cls_set_up = False
+
     @classmethod
     def setUpClass(cls):
         db = DB()
@@ -2528,6 +2931,7 @@
             query("create table %s.t%d with oids as select 1 as n, %d as d"
                   % (schema, num_schema, num_schema))
         db.close()
+        cls.cls_set_up = True
 
     @classmethod
     def tearDownClass(cls):
@@ -2544,6 +2948,7 @@
         db.close()
 
     def setUp(self):
+        self.assertTrue(self.cls_set_up)
         self.db = DB()
 
     def tearDown(self):

Modified: trunk/tests/test_classic_functions.py
==============================================================================
--- trunk/tests/test_classic_functions.py       Tue Jan 19 12:00:26 2016        
(r769)
+++ trunk/tests/test_classic_functions.py       Wed Jan 20 13:19:45 2016        
(r770)
@@ -8,10 +8,8 @@
 Contributed by Christoph Zwerschke.
 
 These tests do not need a database to test against.
-
 """
 
-
 try:
     import unittest2 as unittest  # for Python < 2.7
 except ImportError:

Modified: trunk/tests/test_classic_largeobj.py
==============================================================================
--- trunk/tests/test_classic_largeobj.py        Tue Jan 19 12:00:26 2016        
(r769)
+++ trunk/tests/test_classic_largeobj.py        Wed Jan 20 13:19:45 2016        
(r770)
@@ -8,7 +8,6 @@
 Contributed by Christoph Zwerschke.
 
 These tests need a database to test against.
-
 """
 
 try:

Modified: trunk/tests/test_dbapi20_copy.py
==============================================================================
--- trunk/tests/test_dbapi20_copy.py    Tue Jan 19 12:00:26 2016        (r769)
+++ trunk/tests/test_dbapi20_copy.py    Wed Jan 20 13:19:45 2016        (r770)
@@ -8,7 +8,6 @@
 Contributed by Christoph Zwerschke.
 
 These tests need a database to test against.
-
 """
 
 try:
@@ -123,6 +122,8 @@
 
 class TestCopy(unittest.TestCase):
 
+    cls_set_up = False
+
     @staticmethod
     def connect():
         return pgdb.connect(database=dbname,
@@ -139,6 +140,7 @@
         cur.close()
         con.commit()
         con.close()
+        cls.cls_set_up = True
 
     @classmethod
     def tearDownClass(cls):
@@ -150,6 +152,7 @@
         con.close()
 
     def setUp(self):
+        self.assertTrue(self.cls_set_up)
         self.con = self.connect()
         self.cursor = self.con.cursor()
         self.cursor.execute("set client_encoding=utf8")
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to