Author: cito
Date: Tue Jan 26 12:17:23 2016
New Revision: 784
Log:
Make type cache and cursor description more useful
The type cache now stores some more information, e.g. whether a type is a base
type or a composite type and the category of the type. This may be later used
for casting composite types, or exposed to the user.
The cursor description now contains proper information on the size of numeric
types (including precision and scale).
Modified:
trunk/pgdb.py
trunk/pgmodule.c
trunk/tests/test_dbapi20.py
Modified: trunk/pgdb.py
==============================================================================
--- trunk/pgdb.py Tue Jan 26 06:56:08 2016 (r783)
+++ trunk/pgdb.py Tue Jan 26 12:17:23 2016 (r784)
@@ -156,14 +156,40 @@
return _db_error(msg, OperationalError)
+TypeInfo = namedtuple('TypeInfo',
+ ['oid', 'name', 'len', 'type', 'category', 'delim', 'relid'])
+
+
class TypeCache(dict):
- """Cache for database types."""
+ """Cache for database types.
+
+ This cache maps type OIDs to TypeInfo tuples containing the name
+ and other important info for the database type with the given OID.
+ """
def __init__(self, cnx):
"""Initialize type cache for connection."""
super(TypeCache, self).__init__()
+ self._escape_string = cnx.escape_string
self._src = cnx.source()
+ def __missing__(self, key):
+ q = ("SELECT oid, typname,"
+ " typlen, typtype, typcategory, typdelim, typrelid"
+ " FROM pg_type WHERE ")
+ if isinstance(key, int):
+ q += "oid = %d" % key
+ else:
+ q += "typname = '%s'" % self._escape_string(key)
+ self._src.execute(q)
+ res = list(self._src.fetch(1)[0])
+ res[0] = int(res[0])
+ res[2] = int(res[2])
+ res[6] = int(res[6])
+ res = TypeInfo(*res)
+ self[res.oid] = self[res.name] = res
+ return res
+
@staticmethod
def typecast(typ, value):
"""Cast value according to database type."""
@@ -181,25 +207,11 @@
else:
return cast(value)
- def getdescr(self, oid):
- """Get name of database type with given oid."""
- try:
- return self[oid]
- except KeyError:
- self._src.execute(
- "SELECT typname, typlen "
- "FROM pg_type WHERE oid=%s" % oid)
- res = self._src.fetch(1)[0]
- # The column name is omitted from the return value.
- # It will have to be prepended by the caller.
- res = (res[0], None, int(res[1]), None, None, None)
- self[oid] = res
- return res
-
_re_array_escape = regex(r'(["\\])')
_re_array_quote = regex(r'[{},"\\\s]|^[Nn][Uu][Ll][Ll]$')
+
class _quotedict(dict):
"""Dictionary with auto quoting of its items.
@@ -315,6 +327,25 @@
parameters = tuple(map(self._quote, parameters))
return string % parameters
+ def _make_description(self, info):
+ """Make the description tuple for the given field info."""
+ name, typ, size, mod = info[1:]
+ type_info = self._type_cache[typ]
+ type_code = type_info.name
+ if mod > 0:
+ mod -= 4
+ if type_code == 'numeric':
+ precision, scale = mod >> 16, mod & 0xffff
+ size = precision
+ else:
+ if not size:
+ size = type_info.size
+ if size == -1:
+ size = mod
+ precision = scale = None
+ return CursorDescription(name, type_code,
+ None, size, precision, scale, None)
+
def close(self):
"""Close the cursor object."""
self._src.close()
@@ -377,11 +408,10 @@
# then initialize result raw count and description
if self._src.resulttype == RESULT_DQL:
self.rowcount = self._src.ntuples
- getdescr = self._type_cache.getdescr
- description = [CursorDescription(
- info[1], *getdescr(info[2])) for info in self._src.listinfo()]
- self.colnames = [info[0] for info in description]
- self.coltypes = [info[1] for info in description]
+ description = self._make_description
+ description = [description(info) for info in self._src.listinfo()]
+ self.colnames = [d[0] for d in description]
+ self.coltypes = [d[1] for d in description]
self.description = description
self.lastrowid = None
if self.build_row_factory:
Modified: trunk/pgmodule.c
==============================================================================
--- trunk/pgmodule.c Tue Jan 26 06:56:08 2016 (r783)
+++ trunk/pgmodule.c Tue Jan 26 12:17:23 2016 (r784)
@@ -3621,7 +3621,7 @@
PyObject *result;
/* allocates tuple */
- result = PyTuple_New(3);
+ result = PyTuple_New(5);
if (!result)
return NULL;
@@ -3631,6 +3631,10 @@
PyStr_FromString(PQfname(self->result, num)));
PyTuple_SET_ITEM(result, 2,
PyInt_FromLong(PQftype(self->result, num)));
+ PyTuple_SET_ITEM(result, 3,
+ PyInt_FromLong(PQfsize(self->result, num)));
+ PyTuple_SET_ITEM(result, 4,
+ PyInt_FromLong(PQfmod(self->result, num)));
return result;
}
Modified: trunk/tests/test_dbapi20.py
==============================================================================
--- trunk/tests/test_dbapi20.py Tue Jan 26 06:56:08 2016 (r783)
+++ trunk/tests/test_dbapi20.py Tue Jan 26 12:17:23 2016 (r784)
@@ -262,21 +262,44 @@
def test_description_fields(self):
con = self._connect()
cur = con.cursor()
- cur.execute("select 123456789::int8 as col")
+ cur.execute("select 123456789::int8 col0,"
+ " 123456.789::numeric(41, 13) as col1,"
+ " 'foobar'::char(39) as col2")
desc = cur.description
self.assertIsInstance(desc, list)
- self.assertEqual(len(desc), 1)
- desc = desc[0]
- self.assertIsInstance(desc, tuple)
- self.assertEqual(len(desc), 7)
- self.assertEqual(desc.name, 'col')
- self.assertEqual(desc.type_code, 'int8')
- self.assertIsNone(desc.display_size)
- self.assertIsInstance(desc.internal_size, int)
- self.assertEqual(desc.internal_size, 8)
- self.assertIsNone(desc.precision)
- self.assertIsNone(desc.scale)
- self.assertIsNone(desc.null_ok)
+ self.assertEqual(len(desc), 3)
+ cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
+ for i in range(3):
+ c, d = cols[i], desc[i]
+ self.assertIsInstance(d, tuple)
+ self.assertEqual(len(d), 7)
+ self.assertIsInstance(d.name, str)
+ self.assertEqual(d.name, 'col%d' % i)
+ self.assertIsInstance(d.type_code, str)
+ self.assertEqual(d.type_code, c[0])
+ self.assertIsNone(d.display_size)
+ self.assertIsInstance(d.internal_size, int)
+ self.assertEqual(d.internal_size, c[1])
+ if c[2] is not None:
+ self.assertIsInstance(d.precision, int)
+ self.assertEqual(d.precision, c[1])
+ self.assertIsInstance(d.scale, int)
+ self.assertEqual(d.scale, c[2])
+ else:
+ self.assertIsNone(d.precision)
+ self.assertIsNone(d.scale)
+ self.assertIsNone(d.null_ok)
+
+ def test_type_cache(self):
+ con = self._connect()
+ cur = con.cursor()
+ type_info = cur._type_cache['numeric']
+ self.assertEqual(type_info.oid, 1700)
+ self.assertEqual(type_info.name, 'numeric')
+ self.assertEqual(type_info.type, 'b') # base
+ self.assertEqual(type_info.category, 'N') # numeric
+ self.assertEqual(type_info.delim, ',')
+ self.assertIs(cur._type_cache[1700], type_info)
def test_cursor_iteration(self):
con = self._connect()
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql