Author: cito
Date: Fri Mar 18 08:22:21 2016
New Revision: 857
Log:
Add system parameter to get_relations()
Also fix a regression in the 4.x branch when using temporary tables,
related to filtering system tables (as discussed on the mailing list).
Modified:
branches/4.x/docs/contents/changelog.rst
branches/4.x/docs/contents/pg/db_wrapper.rst
branches/4.x/pg.py
branches/4.x/tests/test_classic_dbwrapper.py
trunk/docs/contents/changelog.rst
trunk/docs/contents/pg/db_wrapper.rst
trunk/pg.py
trunk/tests/test_classic_dbwrapper.py
Modified: branches/4.x/docs/contents/changelog.rst
==============================================================================
--- branches/4.x/docs/contents/changelog.rst Fri Feb 19 09:13:58 2016
(r856)
+++ branches/4.x/docs/contents/changelog.rst Fri Mar 18 08:22:21 2016
(r857)
@@ -1,6 +1,13 @@
ChangeLog
=========
+Version 4.2.2
+-------------
+- The get_relations() and get_tables() methods now also return system views
+ and tables if you set the optional "system" parameter to True.
+- Fixed a regression when using temporary tables with DB wrapper methods
+ (thanks to Patrick TJ McPhee for reporting).
+
Version 4.2.1 (2016-02-18)
--------------------------
- Fixed a small bug when setting the notice receiver.
Modified: branches/4.x/docs/contents/pg/db_wrapper.rst
==============================================================================
--- branches/4.x/docs/contents/pg/db_wrapper.rst Fri Feb 19 09:13:58
2016 (r856)
+++ branches/4.x/docs/contents/pg/db_wrapper.rst Fri Mar 18 08:22:21
2016 (r857)
@@ -86,31 +86,37 @@
get_relations -- get list of relations in connected database
------------------------------------------------------------
-.. method:: DB.get_relations(kinds)
+.. method:: DB.get_relations([kinds], [system])
Get the list of relations in connected database
:param str kinds: a string or sequence of type letters
+ :param bool system: whether system relations should be returned
:returns: all relations of the given kinds in the database
:rtype: list
-The type letters are ``r`` = ordinary table, ``i`` = index, ``S`` = sequence,
-``v`` = view, ``c`` = composite type, ``s`` = special, ``t`` = TOAST table.
-If `kinds` is None or an empty string, all relations are returned (this is
-also the default). Although you can do this with a simple select, it is
-added here for convenience.
+This method returns the list of relations in the connected database. Although
+you can do this with a simple select, it is added here for convenience. You
+can select which kinds of relations you are interested in by passing type
+letters in the `kinds` parameter. The type letters are ``r`` = ordinary table,
+``i`` = index, ``S`` = sequence, ``v`` = view, ``c`` = composite type,
+``s`` = special, ``t`` = TOAST table. If `kinds` is None or an empty string,
+all relations are returned (this is also the default). If `system` is set to
+`True`, then system tables and views (temporary tables, toast tables, catalog
+vies and tables) will be returned as well, otherwise they will be ignored.
get_tables -- get list of tables in connected database
------------------------------------------------------
-.. method:: DB.get_tables()
+.. method:: DB.get_tables([system])
Get the list of tables in connected database
+ :param bool system: whether system tables should be returned
:returns: all tables in connected database
:rtype: list
-This is a shortcut for ``get_relations('r')`` that has been added for
+This is a shortcut for ``get_relations('r', system)`` that has been added for
convenience.
get_attnames -- get the attribute names of a table
Modified: branches/4.x/pg.py
==============================================================================
--- branches/4.x/pg.py Fri Feb 19 09:13:58 2016 (r856)
+++ branches/4.x/pg.py Fri Mar 18 08:22:21 2016 (r857)
@@ -735,12 +735,11 @@
q = ("SELECT s.nspname, r.relname, a.attname"
" FROM pg_class r"
" JOIN pg_namespace s ON s.oid = r.relnamespace"
- " AND s.nspname NOT SIMILAR"
- " TO 'pg/_%|information/_schema' ESCAPE '/'"
" JOIN pg_attribute a ON a.attrelid = r.oid"
" AND NOT a.attisdropped"
" JOIN pg_index i ON i.indrelid = r.oid"
- " AND i.indisprimary AND a.attnum " + any_indkey)
+ " AND i.indisprimary AND a.attnum %s"
+ " AND r.relkind IN ('r', 'v')" % any_indkey)
for r in self.db.query(q).getresult():
cl, pkey = _join_parts(r[:2]), r[2]
self._pkeys.setdefault(cl, []).append(pkey)
@@ -757,27 +756,35 @@
return [s[0] for s in
self.db.query('SELECT datname FROM pg_database').getresult()]
- def get_relations(self, kinds=None):
+ def get_relations(self, kinds=None, system=False):
"""Get list of relations in connected database of specified kinds.
If kinds is None or empty, all kinds of relations are returned.
Otherwise kinds can be a string or sequence of type letters
specifying which kind of relations you want to list.
+ Set the system flag if you want to get the system relations as well.
"""
- where = kinds and " AND r.relkind IN (%s)" % ','.join(
- ["'%s'" % k for k in kinds]) or ''
+ where = []
+ if kinds:
+ where.append("r.relkind IN (%s)" %
+ ','.join(["'%s'" % k for k in kinds]))
+ if not system:
+ where.append("s.nspname NOT SIMILAR"
+ " TO 'pg/_%|information/_schema' ESCAPE '/'")
+ where = where and " WHERE %s" % ' AND '.join(where) or ''
q = ("SELECT s.nspname, r.relname"
" FROM pg_class r"
- " JOIN pg_namespace s ON s.oid = r.relnamespace"
- " WHERE s.nspname NOT SIMILAR"
- " TO 'pg/_%%|information/_schema' ESCAPE '/' %s"
+ " JOIN pg_namespace s ON s.oid = r.relnamespace%s"
" ORDER BY 1, 2") % where
return [_join_parts(r) for r in self.db.query(q).getresult()]
- def get_tables(self):
- """Return list of tables in connected database."""
- return self.get_relations('r')
+ def get_tables(self, system=False):
+ """Return list of tables in connected database.
+
+ Set the system flag if you want to get the system tables as well.
+ """
+ return self.get_relations('r', system)
def get_attnames(self, cl, newattnames=None):
"""Given the name of a table, digs out the set of attribute names.
@@ -801,8 +808,6 @@
# May as well cache them:
if qcl in self._attnames:
return self._attnames[qcl]
- if qcl not in self.get_relations('rv'):
- raise _prg_error('Class %s does not exist' % qcl)
q = ("SELECT a.attname, t.typname%s"
" FROM pg_class r"
@@ -810,10 +815,20 @@
" JOIN pg_attribute a ON a.attrelid = r.oid"
" JOIN pg_type t ON t.oid = a.atttypid"
" WHERE s.nspname = $1 AND r.relname = $2"
+ " AND r.relkind IN ('r', 'v')"
" AND (a.attnum > 0 OR a.attname = 'oid')"
" AND NOT a.attisdropped") % (
self._regtypes and '::regtype' or '',)
q = self.db.query(q, cl).getresult()
+ if not q:
+ r = ("SELECT r.relnamespace"
+ " FROM pg_class r"
+ " JOIN pg_namespace s ON s.oid = r.relnamespace"
+ " WHERE s.nspname =$1 AND r.relname = $2"
+ " AND r.relkind IN ('r', 'v') LIMIT 1")
+ r = self.db.query(r, cl).getresult()
+ if not r:
+ raise _prg_error('Class %s does not exist' % qcl)
if self._regtypes:
t = dict(q)
Modified: branches/4.x/tests/test_classic_dbwrapper.py
==============================================================================
--- branches/4.x/tests/test_classic_dbwrapper.py Fri Feb 19 09:13:58
2016 (r856)
+++ branches/4.x/tests/test_classic_dbwrapper.py Fri Mar 18 08:22:21
2016 (r857)
@@ -765,6 +765,18 @@
result2 = get_tables()
self.assertEqual(result2, result1)
+ def testGetSystemTables(self):
+ get_tables = self.db.get_tables
+ result = get_tables()
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_tables(system=False)
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_tables(system=True)
+ self.assertIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+
def testGetRelations(self):
get_relations = self.db.get_relations
result = get_relations()
@@ -812,6 +824,18 @@
self.assertEqual(attributes, result)
self.db.query('drop table "%s"' % table)
+ def testGetSystemRelations(self):
+ get_relations = self.db.get_relations
+ result = get_relations()
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_relations(system=False)
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_relations(system=True)
+ self.assertIn('pg_catalog.pg_class', result)
+ self.assertIn('information_schema.tables', result)
+
def testHasTablePrivilege(self):
can = self.db.has_table_privilege
self.assertEqual(can('test'), True)
@@ -1231,6 +1255,25 @@
query("drop table test_table_2")
query("drop table test_table")
+ def testTempCrud(self):
+ query = self.db.query
+ table = 'test_temp_table'
+ query("drop table if exists %s" % table)
+ query("create temporary table %s"
+ " (n int primary key, t varchar)" % table)
+ self.db.insert(table, dict(n=1, t='one'))
+ self.db.insert(table, dict(n=2, t='too'))
+ self.db.insert(table, dict(n=3, t='three'))
+ r = self.db.get(table, 2)
+ self.assertEqual(r['t'], 'too')
+ self.db.update(table, dict(n=2, t='two'))
+ r = self.db.get(table, 2)
+ self.assertEqual(r['t'], 'two')
+ self.db.delete(table, r)
+ r = query('select n, t from %s order by 1' % table).getresult()
+ self.assertEqual(r, [(1, 'one'), (3, 'three')])
+ query("drop table %s" % table)
+
def testTruncateRestart(self):
truncate = self.db.truncate
self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
Modified: trunk/docs/contents/changelog.rst
==============================================================================
--- trunk/docs/contents/changelog.rst Fri Feb 19 09:13:58 2016 (r856)
+++ trunk/docs/contents/changelog.rst Fri Mar 18 08:22:21 2016 (r857)
@@ -132,6 +132,13 @@
- The tty parameter and attribute of database connections has been
removed since it is not supported any more since PostgreSQL 7.4.
+Version 4.2.2
+-------------
+- The get_relations() and get_tables() methods now also return system views
+ and tables if you set the optional "system" parameter to True.
+- Fixed a regression when using temporary tables with DB wrapper methods
+ (thanks to Patrick TJ McPhee for reporting).
+
Version 4.2.1 (2016-02-18)
--------------------------
- Fixed a small bug when setting the notice receiver.
Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst Fri Feb 19 09:13:58 2016
(r856)
+++ trunk/docs/contents/pg/db_wrapper.rst Fri Mar 18 08:22:21 2016
(r857)
@@ -84,31 +84,37 @@
get_relations -- get list of relations in connected database
------------------------------------------------------------
-.. method:: DB.get_relations(kinds)
+.. method:: DB.get_relations([kinds], [system])
Get the list of relations in connected database
:param str kinds: a string or sequence of type letters
+ :param bool system: whether system relations should be returned
:returns: all relations of the given kinds in the database
:rtype: list
-The type letters are ``r`` = ordinary table, ``i`` = index, ``S`` = sequence,
-``v`` = view, ``c`` = composite type, ``s`` = special, ``t`` = TOAST table.
-If `kinds` is None or an empty string, all relations are returned (this is
-also the default). Although you can do this with a simple select, it is
-added here for convenience.
+This method returns the list of relations in the connected database. Although
+you can do this with a simple select, it is added here for convenience. You
+can select which kinds of relations you are interested in by passing type
+letters in the `kinds` parameter. The type letters are ``r`` = ordinary table,
+``i`` = index, ``S`` = sequence, ``v`` = view, ``c`` = composite type,
+``s`` = special, ``t`` = TOAST table. If `kinds` is None or an empty string,
+all relations are returned (this is also the default). If `system` is set to
+`True`, then system tables and views (temporary tables, toast tables, catalog
+vies and tables) will be returned as well, otherwise they will be ignored.
get_tables -- get list of tables in connected database
------------------------------------------------------
-.. method:: DB.get_tables()
+.. method:: DB.get_tables([system])
Get the list of tables in connected database
+ :param bool system: whether system tables should be returned
:returns: all tables in connected database
:rtype: list
-This is a shortcut for ``get_relations('r')`` that has been added for
+This is a shortcut for ``get_relations('r', system)`` that has been added for
convenience.
get_attnames -- get the attribute names of a table
Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Fri Feb 19 09:13:58 2016 (r856)
+++ trunk/pg.py Fri Mar 18 08:22:21 2016 (r857)
@@ -1751,26 +1751,35 @@
return [s[0] for s in
self.db.query('SELECT datname FROM pg_database').getresult()]
- def get_relations(self, kinds=None):
+ def get_relations(self, kinds=None, system=False):
"""Get list of relations in connected database of specified kinds.
If kinds is None or empty, all kinds of relations are returned.
Otherwise kinds can be a string or sequence of type letters
specifying which kind of relations you want to list.
+
+ Set the system flag if you want to get the system relations as well.
"""
- where = " AND r.relkind IN (%s)" % ','.join(
- ["'%s'" % k for k in kinds]) if kinds else ''
+ where = []
+ if kinds:
+ where.append("r.relkind IN (%s)" %
+ ','.join("'%s'" % k for k in kinds))
+ if not system:
+ where.append("s.nspname NOT SIMILAR"
+ " TO 'pg/_%|information/_schema' ESCAPE '/'")
+ where = " WHERE %s" % ' AND '.join(where) if where else ''
q = ("SELECT quote_ident(s.nspname)||'.'||quote_ident(r.relname)"
" FROM pg_class r"
- " JOIN pg_namespace s ON s.oid = r.relnamespace"
- " WHERE s.nspname NOT SIMILAR"
- " TO 'pg/_%%|information/_schema' ESCAPE '/' %s"
+ " JOIN pg_namespace s ON s.oid = r.relnamespace%s"
" ORDER BY s.nspname, r.relname") % where
return [r[0] for r in self.db.query(q).getresult()]
- def get_tables(self):
- """Return list of tables in connected database."""
- return self.get_relations('r')
+ def get_tables(self, system=False):
+ """Return list of tables in connected database.
+
+ Set the system flag if you want to get the system tables as well.
+ """
+ return self.get_relations('r', system)
def get_attnames(self, table, with_oid=True, flush=False):
"""Given the name of a table, dig out the set of attribute names.
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Fri Feb 19 09:13:58 2016
(r856)
+++ trunk/tests/test_classic_dbwrapper.py Fri Mar 18 08:22:21 2016
(r857)
@@ -989,6 +989,18 @@
after_tables = get_tables()
self.assertEqual(after_tables, before_tables)
+ def testGetSystemTables(self):
+ get_tables = self.db.get_tables
+ result = get_tables()
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_tables(system=False)
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_tables(system=True)
+ self.assertIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+
def testGetRelations(self):
get_relations = self.db.get_relations
result = get_relations()
@@ -1007,6 +1019,18 @@
self.assertNotIn('public.test', result)
self.assertNotIn('public.test_view', result)
+ def testGetSystemRelations(self):
+ get_relations = self.db.get_relations
+ result = get_relations()
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_relations(system=False)
+ self.assertNotIn('pg_catalog.pg_class', result)
+ self.assertNotIn('information_schema.tables', result)
+ result = get_relations(system=True)
+ self.assertIn('pg_catalog.pg_class', result)
+ self.assertIn('information_schema.tables', result)
+
def testGetAttnames(self):
get_attnames = self.db.get_attnames
self.assertRaises(pg.ProgrammingError,
@@ -2308,6 +2332,21 @@
q = "select n from test_parent natural join test_child limit 2"
self.assertEqual(query(q).getresult(), [(1,)])
+ def testTempCrud(self):
+ table = 'test_temp_table'
+ self.createTable(table, "n int primary key, t varchar", temporary=True)
+ self.db.insert(table, dict(n=1, t='one'))
+ self.db.insert(table, dict(n=2, t='too'))
+ self.db.insert(table, dict(n=3, t='three'))
+ r = self.db.get(table, 2)
+ self.assertEqual(r['t'], 'too')
+ self.db.update(table, dict(n=2, t='two'))
+ r = self.db.get(table, 2)
+ self.assertEqual(r['t'], 'two')
+ self.db.delete(table, r)
+ r = self.db.query('select n, t from %s order by 1' % table).getresult()
+ self.assertEqual(r, [(1, 'one'), (3, 'three')])
+
def testTruncate(self):
truncate = self.db.truncate
self.assertRaises(TypeError, truncate, None)
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql