Author: cito
Date: Thu Jan 14 20:16:23 2016
New Revision: 745
Log:
Add methods get/set_parameter to DB wrapper class
These methods can be used to get/set/reset run-time parameters,
even several at once.
Since this is pretty useful and will not break anything, I have
also back ported these additions to the 4.x branch.
Everything is well documented and tested, of course.
Modified:
branches/4.x/docs/contents/changelog.rst
branches/4.x/docs/contents/pg/db_wrapper.rst
branches/4.x/pg.py
branches/4.x/tests/test_classic_dbwrapper.py
branches/4.x/tests/test_tutorial.py
trunk/docs/contents/changelog.rst
trunk/docs/contents/pg/db_wrapper.rst
trunk/pg.py
trunk/tests/test_classic_dbwrapper.py
Modified: branches/4.x/docs/contents/changelog.rst
==============================================================================
--- branches/4.x/docs/contents/changelog.rst Thu Jan 14 15:26:30 2016
(r744)
+++ branches/4.x/docs/contents/changelog.rst Thu Jan 14 20:16:23 2016
(r745)
@@ -3,10 +3,12 @@
Version 4.2
-----------
-- Set a better default for the user option "escaping-funcs".
- The supported Python versions are 2.4 to 2.7.
- PostgreSQL is supported in all versions from 8.3 to 9.5.
+- Set a better default for the user option "escaping-funcs".
- Force build to compile with no errors.
+- New methods get_parameters() and set_parameters() in the classic interface
+ which can be used to get or set run-time parameters.
- Fix decimal point handling.
- Add option to return boolean values as bool objects.
- Add option to return money values as string.
Modified: branches/4.x/docs/contents/pg/db_wrapper.rst
==============================================================================
--- branches/4.x/docs/contents/pg/db_wrapper.rst Thu Jan 14 15:26:30
2016 (r744)
+++ branches/4.x/docs/contents/pg/db_wrapper.rst Thu Jan 14 20:16:23
2016 (r745)
@@ -121,11 +121,78 @@
Get the attribute names of a table
:param str table: name of table
- :returns: A dictionary -- the keys are the attribute names,
- the values are the type names of the attributes.
+ :returns: a dictionary mapping attribute names to type names
Given the name of a table, digs out the set of attribute names.
+Returns a dictionary of attribute names (the names are the keys,
+the values are the names of the attributes' types).
+
+By default, only a limited number of simple types will be returned.
+You can get the regular types after enabling this by calling the
+:meth:`DB.use_regtypes` method.
+
+get/set_parameter -- get or set run-time parameters
+----------------------------------------------------
+
+.. method:: DB.get_parameter(parameter)
+
+ Get the value of run-time parameters
+
+ :param parameter: the run-time parameter(s) to get
+ :type param: str, tuple, list or dict
+ :returns: the current value(s) of the run-time parameter(s)
+ :rtype: str, list or dict
+ :raises TypeError: Invalid parameter type(s)
+ :raises ProgrammingError: Invalid parameter name(s)
+
+If the parameter is a string, the return value will also be a string
+that is the current setting of the run-time parameter with that name.
+
+You can get several parameters at once by passing a list or tuple of
+parameter names. The return value will then be a corresponding list
+of parameter settings. If you pass a dict as parameter instead, its
+values will be set to the parameter settings corresponding to its keys.
+
+By passing the special name `'all'` as the parameter, you can get a dict
+of all existing configuration parameters.
+
+.. versionadded:: 4.2
+
+.. method:: DB.set_parameter(self, parameter, [value], [local])
+
+ Set the value of run-time parameters
+
+ :param parameter: the run-time parameter(s) to set
+ :type param: string, tuple, list or dict
+ :param value: the value to set
+ :type param: str or None
+ :raises TypeError: Invalid parameter type(s)
+ :raises ValueError: Invalid value argument(s)
+ :raises ProgrammingError: Invalid parameter name(s) or values
+
+If the parameter and the value are strings, the run-time parameter
+will be set to that value. If no value or *None* is passed as a value,
+then the run-time parameter will be restored to its default value.
+
+You can set several parameters at once by passing a list or tuple
+of parameter names, with a single value that all parameters should
+be set to or with a corresponding list or tuple of values.
+
+You can also pass a dict as parameters. In this case, you should
+not pass a value, since the values will be taken from the dict.
+
+By passing the special name `'all'` as the parameter, you can reset
+all existing settable run-time parameters to their default values.
+
+If you set *local* to `True`, then the command takes effect for only the
+current transaction. After :meth:`DB.commit` or :meth:`DB.rollback`,
+the session-level setting takes effect again. Setting *local* to `True`
+will appear to have no effect if it is executed outside a transaction,
+since the transaction will end immediately.
+
+.. versionadded:: 4.2
+
has_table_privilege -- check table privilege
--------------------------------------------
Modified: branches/4.x/pg.py
==============================================================================
--- branches/4.x/pg.py Thu Jan 14 15:26:30 2016 (r744)
+++ branches/4.x/pg.py Thu Jan 14 20:16:23 2016 (r745)
@@ -517,6 +517,125 @@
"""Destroy a previously defined savepoint."""
return self.query('RELEASE ' + name)
+ def get_parameter(self, parameter):
+ """Get the value of a run-time parameter.
+
+ If the parameter is a string, the return value will also be a string
+ that is the current setting of the run-time parameter with that name.
+
+ You can get several parameters at once by passing a list or tuple of
+ parameter names. The return value will then be a corresponding list
+ of parameter settings. If you pass a dict as parameter instead, its
+ values will be set to the parameter settings corresponding to its keys.
+
+ By passing the special name 'all' as the parameter, you can get a dict
+ of all existing configuration parameters.
+ """
+ if isinstance(parameter, basestring):
+ parameter = [parameter]
+ values = None
+ elif isinstance(parameter, (list, tuple)):
+ values = []
+ elif isinstance(parameter, dict):
+ values = parameter
+ else:
+ raise TypeError('The parameter must be a dict, list or string')
+ if not parameter:
+ raise TypeError('No parameter has been specified')
+ if isinstance(values, dict):
+ params = {}
+ else:
+ params = []
+ for key in parameter:
+ if isinstance(key, basestring):
+ param = key.strip().lower()
+ else:
+ param = None
+ if not param:
+ raise TypeError('Invalid parameter')
+ if param == 'all':
+ q = 'SHOW ALL'
+ values = self.db.query(q).getresult()
+ values = dict(value[:2] for value in values)
+ break
+ if isinstance(values, dict):
+ params[param] = key
+ else:
+ params.append(param)
+ else:
+ for param in params:
+ q = 'SHOW %s' % (param,)
+ value = self.db.query(q).getresult()[0][0]
+ if values is None:
+ values = value
+ elif isinstance(values, list):
+ values.append(value)
+ else:
+ values[params[param]] = value
+ return values
+
+ def set_parameter(self, parameter, value=None, local=False):
+ """Set the value of a run-time parameter.
+
+ If the parameter and the value are strings, the run-time parameter
+ will be set to that value. If no value or None is passed as a value,
+ then the run-time parameter will be restored to its default value.
+
+ You can set several parameters at once by passing a list or tuple
+ of parameter names, with a single value that all parameters should
+ be set to or with a corresponding list or tuple of values.
+
+ You can also pass a dict as parameters. In this case, you should
+ not pass a value, since the values will be taken from the dict.
+
+ By passing the special name 'all' as the parameter, you can reset
+ all existing settable run-time parameters to their default values.
+
+ If you set local to True, then the command takes effect for only the
+ current transaction. After commit() or rollback(), the session-level
+ setting takes effect again. Setting local to True will appear to
+ have no effect if it is executed outside a transaction, since the
+ transaction will end immediately.
+ """
+ if isinstance(parameter, basestring):
+ parameter = {parameter: value}
+ elif isinstance(parameter, (list, tuple)):
+ if isinstance(value, (list, tuple)):
+ parameter = dict(zip(parameter, value))
+ else:
+ parameter = dict.fromkeys(parameter, value)
+ elif isinstance(parameter, dict):
+ if value is not None:
+ raise ValueError(
+ 'A value must not be set when parameter is a dictionary')
+ else:
+ raise TypeError('The parameter must be a dict, list or string')
+ if not parameter:
+ raise TypeError('No parameter has been specified')
+ params = {}
+ for key, value in parameter.items():
+ if isinstance(key, basestring):
+ param = key.strip().lower()
+ else:
+ param = None
+ if not param:
+ raise TypeError('Invalid parameter')
+ if param == 'all':
+ if value is not None:
+ raise ValueError(
+ "A value must ot be set when parameter is 'all'")
+ params = {'all': None}
+ break
+ params[param] = value
+ local = local and ' LOCAL' or ''
+ for param, value in params.items():
+ if value is None:
+ q = 'RESET%s %s' % (local, param)
+ else:
+ q = 'SET%s %s TO %s' % (local, param, value)
+ self._do_debug(q)
+ self.db.query(q)
+
def query(self, qstr, *args):
"""Executes a SQL command string.
Modified: branches/4.x/tests/test_classic_dbwrapper.py
==============================================================================
--- branches/4.x/tests/test_classic_dbwrapper.py Thu Jan 14 15:26:30
2016 (r744)
+++ branches/4.x/tests/test_classic_dbwrapper.py Thu Jan 14 20:16:23
2016 (r745)
@@ -72,61 +72,31 @@
def testAllDBAttributes(self):
attributes = [
'begin',
- 'cancel',
- 'clear',
- 'close',
- 'commit',
- 'db',
- 'dbname',
- 'debug',
- 'delete',
- 'end',
- 'endcopy',
- 'error',
- 'escape_bytea',
- 'escape_identifier',
- 'escape_literal',
- 'escape_string',
+ 'cancel', 'clear', 'close', 'commit',
+ 'db', 'dbname', 'debug', 'delete',
+ 'end', 'endcopy', 'error',
+ 'escape_bytea', 'escape_identifier',
+ 'escape_literal', 'escape_string',
'fileno',
- 'get',
- 'get_attnames',
- 'get_databases',
- 'get_notice_receiver',
- 'get_relations',
- 'get_tables',
- 'getline',
- 'getlo',
- 'getnotify',
- 'has_table_privilege',
- 'host',
- 'insert',
- 'inserttable',
- 'locreate',
- 'loimport',
+ 'get', 'get_attnames', 'get_databases',
+ 'get_notice_receiver', 'get_parameter',
+ 'get_relations', 'get_tables',
+ 'getline', 'getlo', 'getnotify',
+ 'has_table_privilege', 'host',
+ 'insert', 'inserttable',
+ 'locreate', 'loimport',
'notification_handler',
'options',
- 'parameter',
- 'pkey',
- 'port',
- 'protocol_version',
- 'putline',
+ 'parameter', 'pkey', 'port',
+ 'protocol_version', 'putline',
'query',
- 'release',
- 'reopen',
- 'reset',
- 'rollback',
- 'savepoint',
- 'server_version',
- 'set_notice_receiver',
- 'source',
- 'start',
- 'status',
- 'transaction',
- 'tty',
- 'unescape_bytea',
- 'update',
- 'use_regtypes',
- 'user',
+ 'release', 'reopen', 'reset', 'rollback',
+ 'savepoint', 'server_version',
+ 'set_notice_receiver', 'set_parameter',
+ 'source', 'start', 'status',
+ 'transaction', 'tty',
+ 'unescape_bytea', 'update',
+ 'use_regtypes', 'user',
]
if self.db.server_version < 90000: # PostgreSQL < 9.0
attributes.remove('escape_identifier')
@@ -288,8 +258,8 @@
db.query("drop table if exists test cascade")
db.query("create table test ("
"i2 smallint, i4 integer, i8 bigint,"
- "d numeric, f4 real, f8 double precision, m money, "
- "v4 varchar(4), c4 char(4), t text)")
+ " d numeric, f4 real, f8 double precision, m money,"
+ " v4 varchar(4), c4 char(4), t text)")
db.query("create or replace view test_view as"
" select i4, v4 from test")
db.close()
@@ -441,6 +411,150 @@
self.assertEqual(f('ab\\c', 'text'), "'ab\\\\c'")
self.assertEqual(f("a\\b'c", 'text'), "'a\\\\b''c'")
+ def testGetParameter(self):
+ f = self.db.get_parameter
+ self.assertRaises(TypeError, f)
+ self.assertRaises(TypeError, f, None)
+ self.assertRaises(TypeError, f, 42)
+ self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+ r = f('standard_conforming_strings')
+ self.assertEqual(r, 'on')
+ r = f('lc_monetary')
+ self.assertEqual(r, 'C')
+ r = f('datestyle')
+ self.assertEqual(r, 'ISO, YMD')
+ r = f('bytea_output')
+ self.assertEqual(r, 'hex')
+ r = f(('bytea_output', 'lc_monetary'))
+ self.assertIsInstance(r, list)
+ self.assertEqual(r, ['hex', 'C'])
+ r = f(['standard_conforming_strings', 'datestyle', 'bytea_output'])
+ self.assertEqual(r, ['on', 'ISO, YMD', 'hex'])
+ s = dict.fromkeys(('bytea_output', 'lc_monetary'))
+ r = f(s)
+ self.assertIs(r, s)
+ self.assertEqual(r, {'bytea_output': 'hex', 'lc_monetary': 'C'})
+ s = dict.fromkeys(('Bytea_Output', 'LC_Monetary'))
+ r = f(s)
+ self.assertIs(r, s)
+ self.assertEqual(r, {'Bytea_Output': 'hex', 'LC_Monetary': 'C'})
+
+ def testGetParameterServerVersion(self):
+ r = self.db.get_parameter('server_version_num')
+ self.assertIsInstance(r, str)
+ s = self.db.server_version
+ self.assertIsInstance(s, int)
+ self.assertEqual(r, str(s))
+
+ def testGetParameterAll(self):
+ f = self.db.get_parameter
+ r = f('all')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r['standard_conforming_strings'], 'on')
+ self.assertEqual(r['lc_monetary'], 'C')
+ self.assertEqual(r['DateStyle'], 'ISO, YMD')
+ self.assertEqual(r['bytea_output'], 'hex')
+
+ def testSetParameter(self):
+ f = self.db.set_parameter
+ g = self.db.get_parameter
+ self.assertRaises(TypeError, f)
+ self.assertRaises(TypeError, f, None)
+ self.assertRaises(TypeError, f, 42)
+ self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+ f('standard_conforming_strings', 'off')
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ f('datestyle', 'ISO, DMY')
+ self.assertEqual(g('datestyle'), 'ISO, DMY')
+ f(('standard_conforming_strings', 'datestyle'), ('on', 'ISO, YMD'))
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.assertEqual(g('datestyle'), 'ISO, YMD')
+ f(['standard_conforming_strings', 'datestyle'], ['off', 'ISO, DMY'])
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ self.assertEqual(g('datestyle'), 'ISO, DMY')
+ f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.assertEqual(g('datestyle'), 'ISO, YMD')
+ f(('default_with_oids', 'standard_conforming_strings'), 'off')
+ self.assertEqual(g('default_with_oids'), 'off')
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ f(['default_with_oids', 'standard_conforming_strings'], 'on')
+ self.assertEqual(g('default_with_oids'), 'on')
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+
+ def testResetParameter(self):
+ db = DB()
+ f = db.set_parameter
+ g = db.get_parameter
+ r = g('default_with_oids')
+ self.assertIn(r, ('on', 'off'))
+ dwi, not_dwi = r, r == 'on' and 'off' or 'on'
+ r = g('standard_conforming_strings')
+ self.assertIn(r, ('on', 'off'))
+ scs, not_scs = r, r == 'on' and 'off' or 'on'
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f('default_with_oids')
+ f('standard_conforming_strings', None)
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f(('default_with_oids', 'standard_conforming_strings'))
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f(['default_with_oids', 'standard_conforming_strings'], None)
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+
+ def testResetParameterAll(self):
+ db = DB()
+ f = db.set_parameter
+ self.assertRaises(ValueError, f, 'all', 0)
+ self.assertRaises(ValueError, f, 'all', 'off')
+ g = db.get_parameter
+ r = g('default_with_oids')
+ self.assertIn(r, ('on', 'off'))
+ dwi, not_dwi = r, r == 'on' and 'off' or 'on'
+ r = g('standard_conforming_strings')
+ self.assertIn(r, ('on', 'off'))
+ scs, not_scs = r, r == 'on' and 'off' or 'on'
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f('all')
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+
+ def testSetParameterLocal(self):
+ f = self.db.set_parameter
+ g = self.db.get_parameter
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.db.begin()
+ f('standard_conforming_strings', 'off', local=True)
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ self.db.end()
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+
+ def testSetParameterSession(self):
+ f = self.db.set_parameter
+ g = self.db.get_parameter
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.db.begin()
+ f('standard_conforming_strings', 'off', local=False)
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ self.db.end()
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+
def testQuery(self):
query = self.db.query
query("drop table if exists test_table")
@@ -529,9 +643,9 @@
query("create table pkeytest2 ("
"c smallint, d smallint primary key)")
query("create table pkeytest3 ("
- "e smallint, f smallint, g smallint, "
- "h smallint, i smallint, "
- "primary key (f,h))")
+ "e smallint, f smallint, g smallint,"
+ " h smallint, i smallint,"
+ " primary key (f,h))")
pkey = self.db.pkey
self.assertRaises(KeyError, pkey, 'pkeytest0')
self.assertEqual(pkey('pkeytest1'), 'b')
@@ -609,7 +723,7 @@
self.assertNotIn('public.test', result)
self.assertNotIn('public.test_view', result)
- def testAttnames(self):
+ def testGetAttnames(self):
self.assertRaises(pg.ProgrammingError,
self.db.get_attnames, 'does_not_exist')
self.assertRaises(pg.ProgrammingError,
@@ -617,12 +731,12 @@
for table in ('attnames_test_table', 'test table for attnames'):
self.db.query('drop table if exists "%s"' % table)
self.db.query('create table "%s" ('
- 'a smallint, b integer, c bigint, '
- 'e numeric, f float, f2 double precision, m money, '
- 'x smallint, y smallint, z smallint, '
- 'Normal_NaMe smallint, "Special Name" smallint, '
- 't text, u char(2), v varchar(2), '
- 'primary key (y, u)) with oids' % table)
+ ' a smallint, b integer, c bigint,'
+ ' e numeric, f float, f2 double precision, m money,'
+ ' x smallint, y smallint, z smallint,'
+ ' Normal_NaMe smallint, "Special Name" smallint,'
+ ' t text, u char(2), v varchar(2),'
+ ' primary key (y, u)) with oids' % table)
attributes = self.db.get_attnames(table)
result = {'a': 'int', 'c': 'int', 'b': 'int',
'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
@@ -747,13 +861,13 @@
firstname="D'Arcy", nickname='Darcey', grade='A+'))
try:
get('test_students', "D' Arcy")
- except pg.DatabaseError as error:
+ except pg.DatabaseError, error:
self.assertEqual(str(error),
'No such record in public.test_students where firstname = '
"'D'' Arcy'")
try:
get('test_students', "Robert'); TRUNCATE TABLE test_students;--")
- except pg.DatabaseError as error:
+ except pg.DatabaseError, error:
self.assertEqual(str(error),
'No such record in public.test_students where firstname = '
"'Robert''); TRUNCATE TABLE test_students;--'")
Modified: branches/4.x/tests/test_tutorial.py
==============================================================================
--- branches/4.x/tests/test_tutorial.py Thu Jan 14 15:26:30 2016 (r744)
+++ branches/4.x/tests/test_tutorial.py Thu Jan 14 20:16:23 2016 (r745)
@@ -64,7 +64,7 @@
more_fruits = 'cherimaya durian eggfruit fig grapefruit'.split()
if namedtuple:
data = list(enumerate(more_fruits, start=3))
- else: # Pyton < 2.6
+ else: # Python < 2.6
data = [(n + 3, name) for n, name in enumerate(more_fruits)]
db.inserttable('fruits', data)
q = db.query('select * from fruits')
@@ -87,12 +87,11 @@
self.assertEqual(r[6], {'id': 7, 'name': 'grapefruit'})
try:
rows = r = q.namedresult()
- except TypeError: # Python < 2.6
- self.assertIsNone(namedtuple)
- else:
self.assertIsInstance(r, list)
self.assertIsInstance(r[0], tuple)
self.assertEqual(rows[3].name, 'durian')
+ except (AttributeError, TypeError): # Python < 2.6
+ self.assertIsNone(namedtuple)
r = db.update('fruits', banana, name=banana['name'].capitalize())
self.assertIsInstance(r, dict)
self.assertEqual(r, {'id': 2, 'name': 'Banana'})
Modified: trunk/docs/contents/changelog.rst
==============================================================================
--- trunk/docs/contents/changelog.rst Thu Jan 14 15:26:30 2016 (r744)
+++ trunk/docs/contents/changelog.rst Thu Jan 14 20:16:23 2016 (r745)
@@ -42,10 +42,12 @@
Version 4.2
-----------
-- Set a better default for the user option "escaping-funcs".
- The supported Python versions are 2.4 to 2.7.
- PostgreSQL is supported in all versions from 8.3 to 9.5.
+- Set a better default for the user option "escaping-funcs".
- Force build to compile with no errors.
+- New methods get_parameters() and set_parameters() in the classic interface
+ which can be used to get or set run-time parameters.
- Fix decimal point handling.
- Add option to return boolean values as bool objects.
- Add option to return money values as string.
Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst Thu Jan 14 15:26:30 2016
(r744)
+++ trunk/docs/contents/pg/db_wrapper.rst Thu Jan 14 20:16:23 2016
(r745)
@@ -120,8 +120,7 @@
Get the attribute names of a table
:param str table: name of table
- :returns: A dictionary -- the keys are the attribute names,
- the values are the type names of the attributes.
+ :returns: a dictionary mapping attribute names to type names
Given the name of a table, digs out the set of attribute names.
@@ -135,7 +134,6 @@
You can get the regular types after enabling this by calling the
:meth:`DB.use_regtypes` method.
-
has_table_privilege -- check table privilege
--------------------------------------------
@@ -152,6 +150,67 @@
.. versionadded:: 4.0
+get/set_parameter -- get or set run-time parameters
+----------------------------------------------------
+
+.. method:: DB.get_parameter(parameter)
+
+ Get the value of run-time parameters
+
+ :param parameter: the run-time parameter(s) to get
+ :type param: str, tuple, list or dict
+ :returns: the current value(s) of the run-time parameter(s)
+ :rtype: str, list or dict
+ :raises TypeError: Invalid parameter type(s)
+ :raises ProgrammingError: Invalid parameter name(s)
+
+If the parameter is a string, the return value will also be a string
+that is the current setting of the run-time parameter with that name.
+
+You can get several parameters at once by passing a list or tuple of
+parameter names. The return value will then be a corresponding list
+of parameter settings. If you pass a dict as parameter instead, its
+values will be set to the parameter settings corresponding to its keys.
+
+By passing the special name `'all'` as the parameter, you can get a dict
+of all existing configuration parameters.
+
+.. versionadded:: 4.2
+
+.. method:: DB.set_parameter(self, parameter, [value], [local])
+
+ Set the value of run-time parameters
+
+ :param parameter: the run-time parameter(s) to set
+ :type param: string, tuple, list or dict
+ :param value: the value to set
+ :type param: str or None
+ :raises TypeError: Invalid parameter type(s)
+ :raises ValueError: Invalid value argument(s)
+ :raises ProgrammingError: Invalid parameter name(s) or values
+
+If the parameter and the value are strings, the run-time parameter
+will be set to that value. If no value or *None* is passed as a value,
+then the run-time parameter will be restored to its default value.
+
+You can set several parameters at once by passing a list or tuple
+of parameter names, with a single value that all parameters should
+be set to or with a corresponding list or tuple of values.
+
+You can also pass a dict as parameters. In this case, you should
+not pass a value, since the values will be taken from the dict.
+
+By passing the special name `'all'` as the parameter, you can reset
+all existing settable run-time parameters to their default values.
+
+If you set *local* to `True`, then the command takes effect for only the
+current transaction. After :meth:`DB.commit` or :meth:`DB.rollback`,
+the session-level setting takes effect again. Setting *local* to `True`
+will appear to have no effect if it is executed outside a transaction,
+since the transaction will end immediately.
+
+.. versionadded:: 4.2
+
begin/commit/rollback/savepoint/release -- transaction handling
---------------------------------------------------------------
Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Thu Jan 14 15:26:30 2016 (r744)
+++ trunk/pg.py Thu Jan 14 20:16:23 2016 (r745)
@@ -462,6 +462,118 @@
"""Destroy a previously defined savepoint."""
return self.query('RELEASE ' + name)
+ def get_parameter(self, parameter):
+ """Get the value of a run-time parameter.
+
+ If the parameter is a string, the return value will also be a string
+ that is the current setting of the run-time parameter with that name.
+
+ You can get several parameters at once by passing a list or tuple of
+ parameter names. The return value will then be a corresponding list
+ of parameter settings. If you pass a dict as parameter instead, its
+ values will be set to the parameter settings corresponding to its keys.
+
+ By passing the special name 'all' as the parameter, you can get a dict
+ of all existing configuration parameters.
+ """
+ if isinstance(parameter, basestring):
+ parameter = [parameter]
+ values = None
+ elif isinstance(parameter, (list, tuple)):
+ values = []
+ elif isinstance(parameter, dict):
+ values = parameter
+ else:
+ raise TypeError('The parameter must be a dict, list or string')
+ if not parameter:
+ raise TypeError('No parameter has been specified')
+ params = {} if isinstance(values, dict) else []
+ for key in parameter:
+ param = key.strip().lower() if isinstance(
+ key, basestring) else None
+ if not param:
+ raise TypeError('Invalid parameter')
+ if param == 'all':
+ q = 'SHOW ALL'
+ values = self.db.query(q).getresult()
+ values = dict(value[:2] for value in values)
+ break
+ if isinstance(values, dict):
+ params[param] = key
+ else:
+ params.append(param)
+ else:
+ for param in params:
+ q = 'SHOW %s' % (param,)
+ value = self.db.query(q).getresult()[0][0]
+ if values is None:
+ values = value
+ elif isinstance(values, list):
+ values.append(value)
+ else:
+ values[params[param]] = value
+ return values
+
+ def set_parameter(self, parameter, value=None, local=False):
+ """Set the value of a run-time parameter.
+
+ If the parameter and the value are strings, the run-time parameter
+ will be set to that value. If no value or None is passed as a value,
+ then the run-time parameter will be restored to its default value.
+
+ You can set several parameters at once by passing a list or tuple
+ of parameter names, with a single value that all parameters should
+ be set to or with a corresponding list or tuple of values.
+
+ You can also pass a dict as parameters. In this case, you should
+ not pass a value, since the values will be taken from the dict.
+
+ By passing the special name 'all' as the parameter, you can reset
+ all existing settable run-time parameters to their default values.
+
+ If you set local to True, then the command takes effect for only the
+ current transaction. After commit() or rollback(), the session-level
+ setting takes effect again. Setting local to True will appear to
+ have no effect if it is executed outside a transaction, since the
+ transaction will end immediately.
+ """
+ if isinstance(parameter, basestring):
+ parameter = {parameter: value}
+ elif isinstance(parameter, (list, tuple)):
+ if isinstance(value, (list, tuple)):
+ parameter = dict(zip(parameter, value))
+ else:
+ parameter = dict.fromkeys(parameter, value)
+ elif isinstance(parameter, dict):
+ if value is not None:
+ raise ValueError(
+ 'A value must not be set when parameter is a dictionary')
+ else:
+ raise TypeError('The parameter must be a dict, list or string')
+ if not parameter:
+ raise TypeError('No parameter has been specified')
+ params = {}
+ for key, value in parameter.items():
+ param = key.strip().lower() if isinstance(
+ key, basestring) else None
+ if not param:
+ raise TypeError('Invalid parameter')
+ if param == 'all':
+ if value is not None:
+ raise ValueError(
+ "A value must ot be set when parameter is 'all'")
+ params = {'all': None}
+ break
+ params[param] = value
+ local = ' LOCAL' if local else ''
+ for param, value in params.items():
+ if value is None:
+ q = 'RESET%s %s' % (local, param)
+ else:
+ q = 'SET%s %s TO %s' % (local, param, value)
+ self._do_debug(q)
+ self.db.query(q)
+
def query(self, qstr, *args):
"""Execute a SQL command string.
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Thu Jan 14 15:26:30 2016
(r744)
+++ trunk/tests/test_classic_dbwrapper.py Thu Jan 14 20:16:23 2016
(r745)
@@ -85,61 +85,31 @@
def testAllDBAttributes(self):
attributes = [
'begin',
- 'cancel',
- 'clear',
- 'close',
- 'commit',
- 'db',
- 'dbname',
- 'debug',
- 'delete',
- 'end',
- 'endcopy',
- 'error',
- 'escape_bytea',
- 'escape_identifier',
- 'escape_literal',
- 'escape_string',
+ 'cancel', 'clear', 'close', 'commit',
+ 'db', 'dbname', 'debug', 'delete',
+ 'end', 'endcopy', 'error',
+ 'escape_bytea', 'escape_identifier',
+ 'escape_literal', 'escape_string',
'fileno',
- 'get',
- 'get_attnames',
- 'get_databases',
- 'get_notice_receiver',
- 'get_relations',
- 'get_tables',
- 'getline',
- 'getlo',
- 'getnotify',
- 'has_table_privilege',
- 'host',
- 'insert',
- 'inserttable',
- 'locreate',
- 'loimport',
+ 'get', 'get_attnames', 'get_databases',
+ 'get_notice_receiver', 'get_parameter',
+ 'get_relations', 'get_tables',
+ 'getline', 'getlo', 'getnotify',
+ 'has_table_privilege', 'host',
+ 'insert', 'inserttable',
+ 'locreate', 'loimport',
'notification_handler',
'options',
- 'parameter',
- 'pkey',
- 'port',
- 'protocol_version',
- 'putline',
+ 'parameter', 'pkey', 'port',
+ 'protocol_version', 'putline',
'query',
- 'release',
- 'reopen',
- 'reset',
- 'rollback',
- 'savepoint',
- 'server_version',
- 'set_notice_receiver',
- 'source',
- 'start',
- 'status',
+ 'release', 'reopen', 'reset', 'rollback',
+ 'savepoint', 'server_version',
+ 'set_notice_receiver', 'set_parameter',
+ 'source', 'start', 'status',
'transaction',
- 'unescape_bytea',
- 'update',
- 'upsert',
- 'use_regtypes',
- 'user',
+ 'unescape_bytea', 'update', 'upsert',
+ 'use_regtypes', 'user',
]
db_attributes = [a for a in dir(self.db)
if not a.startswith('_')]
@@ -287,8 +257,8 @@
db.query("drop table if exists test cascade")
db.query("create table test ("
"i2 smallint, i4 integer, i8 bigint,"
- "d numeric, f4 real, f8 double precision, m money, "
- "v4 varchar(4), c4 char(4), t text)")
+ " d numeric, f4 real, f8 double precision, m money,"
+ " v4 varchar(4), c4 char(4), t text)")
db.query("create or replace view test_view as"
" select i4, v4 from test")
db.close()
@@ -412,90 +382,149 @@
b'\\x746861742773206be47365')
self.assertEqual(f(r'\\x4f007073ff21'), b'\\x4f007073ff21')
- def testGetAttnames(self):
- get_attnames = self.db.get_attnames
- query = self.db.query
- query("drop table if exists test_table")
- self.addCleanup(query, "drop table test_table")
- query("create table test_table("
- " n int, alpha smallint, beta bool,"
- " gamma char(5), tau text, v varchar(3))")
- r = get_attnames("test_table")
- self.assertIsInstance(r, dict)
- self.assertEquals(r, dict(
- n='int', alpha='int', beta='bool',
- gamma='text', tau='text', v='text'))
+ def testGetParameter(self):
+ f = self.db.get_parameter
+ self.assertRaises(TypeError, f)
+ self.assertRaises(TypeError, f, None)
+ self.assertRaises(TypeError, f, 42)
+ self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+ r = f('standard_conforming_strings')
+ self.assertEqual(r, 'on')
+ r = f('lc_monetary')
+ self.assertEqual(r, 'C')
+ r = f('datestyle')
+ self.assertEqual(r, 'ISO, YMD')
+ r = f('bytea_output')
+ self.assertEqual(r, 'hex')
+ r = f(('bytea_output', 'lc_monetary'))
+ self.assertIsInstance(r, list)
+ self.assertEqual(r, ['hex', 'C'])
+ r = f(['standard_conforming_strings', 'datestyle', 'bytea_output'])
+ self.assertEqual(r, ['on', 'ISO, YMD', 'hex'])
+ s = dict.fromkeys(('bytea_output', 'lc_monetary'))
+ r = f(s)
+ self.assertIs(r, s)
+ self.assertEqual(r, {'bytea_output': 'hex', 'lc_monetary': 'C'})
+ s = dict.fromkeys(('Bytea_Output', 'LC_Monetary'))
+ r = f(s)
+ self.assertIs(r, s)
+ self.assertEqual(r, {'Bytea_Output': 'hex', 'LC_Monetary': 'C'})
- def testGetAttnamesWithQuotes(self):
- get_attnames = self.db.get_attnames
- query = self.db.query
- table = 'test table for get_attnames()'
- query('drop table if exists "%s"' % table)
- self.addCleanup(query, 'drop table "%s"' % table)
- query('create table "%s"('
- '"Prime!" smallint,'
- '"much space" integer, "Questions?" text)' % table)
- r = get_attnames(table)
+ def testGetParameterServerVersion(self):
+ r = self.db.get_parameter('server_version_num')
+ self.assertIsInstance(r, str)
+ s = self.db.server_version
+ self.assertIsInstance(s, int)
+ self.assertEqual(r, str(s))
+
+ def testGetParameterAll(self):
+ f = self.db.get_parameter
+ r = f('all')
self.assertIsInstance(r, dict)
- self.assertEquals(r, {
- 'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
+ self.assertEqual(r['standard_conforming_strings'], 'on')
+ self.assertEqual(r['lc_monetary'], 'C')
+ self.assertEqual(r['DateStyle'], 'ISO, YMD')
+ self.assertEqual(r['bytea_output'], 'hex')
+
+ def testSetParameter(self):
+ f = self.db.set_parameter
+ g = self.db.get_parameter
+ self.assertRaises(TypeError, f)
+ self.assertRaises(TypeError, f, None)
+ self.assertRaises(TypeError, f, 42)
+ self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+ f('standard_conforming_strings', 'off')
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ f('datestyle', 'ISO, DMY')
+ self.assertEqual(g('datestyle'), 'ISO, DMY')
+ f(('standard_conforming_strings', 'datestyle'), ('on', 'ISO, YMD'))
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.assertEqual(g('datestyle'), 'ISO, YMD')
+ f(['standard_conforming_strings', 'datestyle'], ['off', 'ISO, DMY'])
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ self.assertEqual(g('datestyle'), 'ISO, DMY')
+ f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.assertEqual(g('datestyle'), 'ISO, YMD')
+ f(('default_with_oids', 'standard_conforming_strings'), 'off')
+ self.assertEqual(g('default_with_oids'), 'off')
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ f(['default_with_oids', 'standard_conforming_strings'], 'on')
+ self.assertEqual(g('default_with_oids'), 'on')
+ self.assertEqual(g('standard_conforming_strings'), 'on')
- def testGetAttnamesWithRegtypes(self):
- get_attnames = self.db.get_attnames
- query = self.db.query
- query("drop table if exists test_table")
- self.addCleanup(query, "drop table test_table")
- query("create table test_table("
- " n int, alpha smallint, beta bool,"
- " gamma char(5), tau text, v varchar(3))")
- self.db.use_regtypes(True)
- try:
- r = get_attnames("test_table")
- self.assertIsInstance(r, dict)
- finally:
- self.db.use_regtypes(False)
- self.assertEquals(r, dict(
- n='integer', alpha='smallint', beta='boolean',
- gamma='character', tau='text', v='character varying'))
+ def testResetParameter(self):
+ db = DB()
+ f = db.set_parameter
+ g = db.get_parameter
+ r = g('default_with_oids')
+ self.assertIn(r, ('on', 'off'))
+ dwi, not_dwi = r, 'off' if r == 'on' else 'on'
+ r = g('standard_conforming_strings')
+ self.assertIn(r, ('on', 'off'))
+ scs, not_scs = r, 'off' if r == 'on' else 'on'
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f('default_with_oids')
+ f('standard_conforming_strings', None)
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f(('default_with_oids', 'standard_conforming_strings'))
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f(['default_with_oids', 'standard_conforming_strings'], None)
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
- def testGetAttnamesIsCached(self):
- get_attnames = self.db.get_attnames
- query = self.db.query
- query("drop table if exists test_table")
- self.addCleanup(query, "drop table if exists test_table")
- query("create table test_table(col int)")
- r = get_attnames("test_table")
- self.assertIsInstance(r, dict)
- self.assertEquals(r, dict(col='int'))
- query("drop table test_table")
- query("create table test_table(col text)")
- r = get_attnames("test_table")
- self.assertEquals(r, dict(col='int'))
- r = get_attnames("test_table", flush=True)
- self.assertEquals(r, dict(col='text'))
- query("drop table test_table")
- r = get_attnames("test_table")
- self.assertEquals(r, dict(col='text'))
- self.assertRaises(pg.ProgrammingError,
- get_attnames, "test_table", flush=True)
+ def testResetParameterAll(self):
+ db = DB()
+ f = db.set_parameter
+ self.assertRaises(ValueError, f, 'all', 0)
+ self.assertRaises(ValueError, f, 'all', 'off')
+ g = db.get_parameter
+ r = g('default_with_oids')
+ self.assertIn(r, ('on', 'off'))
+ dwi, not_dwi = r, 'off' if r == 'on' else 'on'
+ r = g('standard_conforming_strings')
+ self.assertIn(r, ('on', 'off'))
+ scs, not_scs = r, 'off' if r == 'on' else 'on'
+ f('default_with_oids', not_dwi)
+ f('standard_conforming_strings', not_scs)
+ self.assertEqual(g('default_with_oids'), not_dwi)
+ self.assertEqual(g('standard_conforming_strings'), not_scs)
+ f('all')
+ self.assertEqual(g('default_with_oids'), dwi)
+ self.assertEqual(g('standard_conforming_strings'), scs)
+
+ def testSetParameterLocal(self):
+ f = self.db.set_parameter
+ g = self.db.get_parameter
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.db.begin()
+ f('standard_conforming_strings', 'off', local=True)
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ self.db.end()
+ self.assertEqual(g('standard_conforming_strings'), 'on')
- def testGetAttnamesIsOrdered(self):
- get_attnames = self.db.get_attnames
- query = self.db.query
- query("drop table if exists test_table")
- self.addCleanup(query, "drop table test_table")
- query("create table test_table("
- " n int, alpha smallint, v varchar(3),"
- " gamma char(5), tau text, beta bool)")
- r = get_attnames("test_table")
- self.assertIsInstance(r, OrderedDict)
- self.assertEquals(r, OrderedDict([
- ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
- ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
- if OrderedDict is dict:
- self.skipTest('OrderedDict is not supported')
- r = ' '.join(list(r.keys()))
- self.assertEquals(r, 'n alpha v gamma tau beta')
+ def testSetParameterSession(self):
+ f = self.db.set_parameter
+ g = self.db.get_parameter
+ self.assertEqual(g('standard_conforming_strings'), 'on')
+ self.db.begin()
+ f('standard_conforming_strings', 'off', local=False)
+ self.assertEqual(g('standard_conforming_strings'), 'off')
+ self.db.end()
+ self.assertEqual(g('standard_conforming_strings'), 'off')
def testQuery(self):
query = self.db.query
@@ -588,19 +617,19 @@
query('create table "%s2" ('
"c smallint, d smallint primary key)" % t)
query('create table "%s3" ('
- "e smallint, f smallint, g smallint, "
- "h smallint, i smallint, "
- "primary key (f, h))" % t)
+ "e smallint, f smallint, g smallint,"
+ " h smallint, i smallint,"
+ " primary key (f, h))" % t)
query('create table "%s4" ('
"more_than_one_letter varchar primary key)" % t)
query('create table "%s5" ('
'"with space" date primary key)' % t)
query('create table "%s6" ('
- 'a_very_long_column_name varchar, '
- '"with space" date, '
- '"42" int, '
- "primary key (a_very_long_column_name, "
- '"with space", "42"))' % t)
+ 'a_very_long_column_name varchar,'
+ ' "with space" date,'
+ ' "42" int,'
+ " primary key (a_very_long_column_name,"
+ ' "with space", "42"))' % t)
self.assertRaises(KeyError, pkey, '%s0' % t)
self.assertEqual(pkey('%s1' % t), 'b')
self.assertEqual(pkey('%s2' % t), 'd')
@@ -686,28 +715,111 @@
self.assertNotIn('public.test', result)
self.assertNotIn('public.test_view', result)
- def testAttnames(self):
+ def testGetAttnames(self):
+ get_attnames = self.db.get_attnames
self.assertRaises(pg.ProgrammingError,
self.db.get_attnames, 'does_not_exist')
self.assertRaises(pg.ProgrammingError,
self.db.get_attnames, 'has.too.many.dots')
- for table in ('attnames_test_table', 'test table for attnames'):
- self.db.query('drop table if exists "%s"' % table)
- self.addCleanup(self.db.query, 'drop table "%s"' % table)
- self.db.query('create table "%s" ('
- 'a smallint, b integer, c bigint, '
- 'e numeric, f float, f2 double precision, m money, '
- 'x smallint, y smallint, z smallint, '
- 'Normal_NaMe smallint, "Special Name" smallint, '
- 't text, u char(2), v varchar(2), '
- 'primary key (y, u)) with oids' % table)
- attributes = self.db.get_attnames(table)
- result = {'a': 'int', 'c': 'int', 'b': 'int',
- 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
- 'normal_name': 'int', 'Special Name': 'int',
- 'u': 'text', 't': 'text', 'v': 'text',
- 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'}
- self.assertEqual(attributes, result)
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
+ query("create table test_table("
+ " n int, alpha smallint, beta bool,"
+ " gamma char(5), tau text, v varchar(3))")
+ r = get_attnames('test_table')
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, dict(
+ n='int', alpha='int', beta='bool',
+ gamma='text', tau='text', v='text'))
+
+ def testGetAttnamesWithQuotes(self):
+ get_attnames = self.db.get_attnames
+ query = self.db.query
+ table = 'test table for get_attnames()'
+ query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
+ query('create table "%s"('
+ '"Prime!" smallint,'
+ ' "much space" integer, "Questions?" text)' % table)
+ r = get_attnames(table)
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, {
+ 'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
+ table = 'yet another test table for get_attnames()'
+ query('drop table if exists "%s"' % table)
+ self.addCleanup(query, 'drop table "%s"' % table)
+ self.db.query('create table "%s" ('
+ 'a smallint, b integer, c bigint,'
+ ' e numeric, f float, f2 double precision, m money,'
+ ' x smallint, y smallint, z smallint,'
+ ' Normal_NaMe smallint, "Special Name" smallint,'
+ ' t text, u char(2), v varchar(2),'
+ ' primary key (y, u)) with oids' % table)
+ r = get_attnames(table)
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, {'a': 'int', 'c': 'int', 'b': 'int',
+ 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
+ 'normal_name': 'int', 'Special Name': 'int',
+ 'u': 'text', 't': 'text', 'v': 'text',
+ 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
+
+ def testGetAttnamesWithRegtypes(self):
+ get_attnames = self.db.get_attnames
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
+ query("create table test_table("
+ " n int, alpha smallint, beta bool,"
+ " gamma char(5), tau text, v varchar(3))")
+ self.db.use_regtypes(True)
+ try:
+ r = get_attnames("test_table")
+ self.assertIsInstance(r, dict)
+ finally:
+ self.db.use_regtypes(False)
+ self.assertEqual(r, dict(
+ n='integer', alpha='smallint', beta='boolean',
+ gamma='character', tau='text', v='character varying'))
+
+ def testGetAttnamesIsCached(self):
+ get_attnames = self.db.get_attnames
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table if exists test_table")
+ query("create table test_table(col int)")
+ r = get_attnames("test_table")
+ self.assertIsInstance(r, dict)
+ self.assertEqual(r, dict(col='int'))
+ query("drop table test_table")
+ query("create table test_table(col text)")
+ r = get_attnames("test_table")
+ self.assertEqual(r, dict(col='int'))
+ r = get_attnames("test_table", flush=True)
+ self.assertEqual(r, dict(col='text'))
+ query("drop table test_table")
+ r = get_attnames("test_table")
+ self.assertEqual(r, dict(col='text'))
+ self.assertRaises(pg.ProgrammingError,
+ get_attnames, "test_table", flush=True)
+
+ def testGetAttnamesIsOrdered(self):
+ get_attnames = self.db.get_attnames
+ query = self.db.query
+ query("drop table if exists test_table")
+ self.addCleanup(query, "drop table test_table")
+ query("create table test_table("
+ " n int, alpha smallint, v varchar(3),"
+ " gamma char(5), tau text, beta bool)")
+ r = get_attnames("test_table")
+ self.assertIsInstance(r, OrderedDict)
+ self.assertEqual(r, OrderedDict([
+ ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
+ ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
+ if OrderedDict is dict:
+ self.skipTest('OrderedDict is not supported')
+ r = ' '.join(list(r.keys()))
+ self.assertEqual(r, 'n alpha v gamma tau beta')
def testHasTablePrivilege(self):
can = self.db.has_table_privilege
@@ -800,7 +912,7 @@
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
- '"much space" integer, "Questions?" text)' % table)
+ ' "much space" integer, "Questions?" text)' % table)
query('insert into "%s"'
" values(17, 1001, 'No!')" % table)
r = get(table, 17)
@@ -968,7 +1080,7 @@
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
- '"much space" integer, "Questions?" text)' % table)
+ ' "much space" integer, "Questions?" text)' % table)
r = {'Prime!': 11, 'much space': 2002, 'Questions?': 'What?'}
r = insert(table, r)
self.assertIsInstance(r, dict)
@@ -1059,7 +1171,7 @@
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
- '"much space" integer, "Questions?" text)' % table)
+ ' "much space" integer, "Questions?" text)' % table)
query('insert into "%s"'
" values(13, 3003, 'Why!')" % table)
r = {'Prime!': 13, 'much space': 7007, 'Questions?': 'When?'}
@@ -1221,7 +1333,7 @@
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
- '"much space" integer, "Questions?" text)' % table)
+ ' "much space" integer, "Questions?" text)' % table)
s = {'Prime!': 31, 'much space': 9009, 'Questions?': 'Yes.'}
try:
r = upsert(table, s)
@@ -1274,7 +1386,7 @@
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
- '"much space" integer, "Questions?" text)' % table)
+ ' "much space" integer, "Questions?" text)' % table)
r = clear(table)
self.assertIsInstance(r, dict)
self.assertEqual(r['Prime!'], 0)
@@ -1366,7 +1478,7 @@
self.addCleanup(query, 'drop table "%s"' % table)
query('create table "%s" ('
'"Prime!" smallint primary key,'
- '"much space" integer, "Questions?" text)' % table)
+ ' "much space" integer, "Questions?" text)' % table)
query('insert into "%s"'
" values(19, 5005, 'Yes!')" % table)
r = {'Prime!': 17}
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql