Author: cito
Date: Tue Jan 26 16:13:55 2016
New Revision: 788

Log:
Add method columns() to type cache

The method returns the columns of composite types.
This makes the type cache even more useful.

Modified:
   trunk/docs/contents/pgdb/connection.rst
   trunk/pgdb.py
   trunk/tests/test_dbapi20.py

Modified: trunk/docs/contents/pgdb/connection.rst
==============================================================================
--- trunk/docs/contents/pgdb/connection.rst     Tue Jan 26 13:59:18 2016        
(r787)
+++ trunk/docs/contents/pgdb/connection.rst     Tue Jan 26 16:13:55 2016        
(r788)
@@ -98,4 +98,7 @@
 For details, see the PostgreSQL documentation on `pg_type
 <http://www.postgresql.org/docs/current/static/catalog-pg-type.html>`_.
 
+The :attr:`Connection.type_cache` also provides a method :meth:`columns`
+that returns the names and type OIDs of the columns of composite types.
+
 .. versionadded:: 5.0

Modified: trunk/pgdb.py
==============================================================================
--- trunk/pgdb.py       Tue Jan 26 13:59:18 2016        (r787)
+++ trunk/pgdb.py       Tue Jan 26 16:13:55 2016        (r788)
@@ -159,6 +159,8 @@
 TypeInfo = namedtuple('TypeInfo',
     ['oid', 'name', 'len', 'type', 'category', 'delim', 'relid'])
 
+ColumnInfo = namedtuple('ColumnInfo', ['name', 'type'])
+
 
 class TypeCache(dict):
     """Cache for database types.
@@ -174,14 +176,16 @@
         self._src = cnx.source()
 
     def __missing__(self, key):
-        q = ("SELECT oid, typname,"
-             " typlen, typtype, typcategory, typdelim, typrelid"
-            " FROM pg_type WHERE ")
+        """Get the type info from the database if it is not cached."""
         if isinstance(key, int):
-            q += "oid = %d" % key
+            oid = key
         else:
-            q += "typname = '%s'" % self._escape_string(key)
-        self._src.execute(q)
+            if not '.' in key and not '"' in key:
+                key = '"%s"' % key
+            oid = "'%s'::regtype" % self._escape_string(key)
+        self._src.execute("SELECT oid, typname,"
+             " typlen, typtype, typcategory, typdelim, typrelid"
+            " FROM pg_type WHERE oid=%s" % oid)
         res = self._src.fetch(1)
         if not res:
             raise KeyError('Type %s could not be found' % key)
@@ -193,6 +197,17 @@
         self[res.oid] = self[res.name] = res
         return res
 
+    def columns(self, key):
+        """Get the names and types of the columns of composite types."""
+        typ = self[key]
+        if typ.type != 'c' or not typ.relid:
+            return []  # this type is not composite
+        self._src.execute("SELECT attname, atttypid"
+            " FROM pg_attribute WHERE attrelid=%s AND attnum>0"
+            " AND NOT attisdropped ORDER BY attnum" % typ.relid)
+        return [ColumnInfo(name, int(oid))
+            for name, oid in self._src.fetch(-1)]
+
     @staticmethod
     def typecast(typ, value):
         """Cast value according to database type."""

Modified: trunk/tests/test_dbapi20.py
==============================================================================
--- trunk/tests/test_dbapi20.py Tue Jan 26 13:59:18 2016        (r787)
+++ trunk/tests/test_dbapi20.py Tue Jan 26 16:13:55 2016        (r788)
@@ -293,13 +293,28 @@
     def test_type_cache(self):
         con = self._connect()
         cur = con.cursor()
-        type_info = cur.type_cache['numeric']
+        type_cache = cur.type_cache
+        type_info = type_cache['numeric']
         self.assertEqual(type_info.oid, 1700)
         self.assertEqual(type_info.name, 'numeric')
         self.assertEqual(type_info.type, 'b')  # base
         self.assertEqual(type_info.category, 'N')  # numeric
         self.assertEqual(type_info.delim, ',')
         self.assertIs(cur.type_cache[1700], type_info)
+        type_info = type_cache['pg_type']
+        self.assertEqual(type_info.type, 'c')  # composite
+        self.assertEqual(type_info.category, 'C')  # composite
+        cols = type_cache.columns('pg_type')
+        self.assertEqual(cols[0].name, 'typname')
+        typname = type_cache[cols[0].type]
+        self.assertEqual(typname.name, 'name')
+        self.assertEqual(typname.type, 'b')  # base
+        self.assertEqual(typname.category, 'S')  # string
+        self.assertEqual(cols[3].name, 'typlen')
+        typlen = type_cache[cols[3].type]
+        self.assertEqual(typlen.name, 'int2')
+        self.assertEqual(typlen.type, 'b')  # base
+        self.assertEqual(typlen.category, 'N')  # numeric
 
     def test_cursor_iteration(self):
         con = self._connect()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to