Author: cito
Date: Thu Jan 14 20:16:23 2016
New Revision: 745

Log:
Add methods get/set_parameter to DB wrapper class

These methods can be used to get/set/reset run-time parameters,
even several at once.

Since this is pretty useful and will not break anything, I have
also back ported these additions to the 4.x branch.

Everything is well documented and tested, of course.

Modified:
   branches/4.x/docs/contents/changelog.rst
   branches/4.x/docs/contents/pg/db_wrapper.rst
   branches/4.x/pg.py
   branches/4.x/tests/test_classic_dbwrapper.py
   branches/4.x/tests/test_tutorial.py
   trunk/docs/contents/changelog.rst
   trunk/docs/contents/pg/db_wrapper.rst
   trunk/pg.py
   trunk/tests/test_classic_dbwrapper.py

Modified: branches/4.x/docs/contents/changelog.rst
==============================================================================
--- branches/4.x/docs/contents/changelog.rst    Thu Jan 14 15:26:30 2016        
(r744)
+++ branches/4.x/docs/contents/changelog.rst    Thu Jan 14 20:16:23 2016        
(r745)
@@ -3,10 +3,12 @@
 
 Version 4.2
 -----------
-- Set a better default for the user option "escaping-funcs".
 - The supported Python versions are 2.4 to 2.7.
 - PostgreSQL is supported in all versions from 8.3 to 9.5.
+- Set a better default for the user option "escaping-funcs".
 - Force build to compile with no errors.
+- New methods get_parameters() and set_parameters() in the classic interface
+  which can be used to get or set run-time parameters.
 - Fix decimal point handling.
 - Add option to return boolean values as bool objects.
 - Add option to return money values as string.

Modified: branches/4.x/docs/contents/pg/db_wrapper.rst
==============================================================================
--- branches/4.x/docs/contents/pg/db_wrapper.rst        Thu Jan 14 15:26:30 
2016        (r744)
+++ branches/4.x/docs/contents/pg/db_wrapper.rst        Thu Jan 14 20:16:23 
2016        (r745)
@@ -121,11 +121,78 @@
     Get the attribute names of a table
 
     :param str table: name of table
-    :returns: A dictionary -- the keys are the attribute names,
-     the values are the type names of the attributes.
+    :returns: a dictionary mapping attribute names to type names
 
 Given the name of a table, digs out the set of attribute names.
 
+Returns a dictionary of attribute names (the names are the keys,
+the values are the names of the attributes' types).
+
+By default, only a limited number of simple types will be returned.
+You can get the regular types after enabling this by calling the
+:meth:`DB.use_regtypes` method.
+
+get/set_parameter -- get or set  run-time parameters
+----------------------------------------------------
+
+.. method:: DB.get_parameter(parameter)
+
+    Get the value of run-time parameters
+
+    :param parameter: the run-time parameter(s) to get
+    :type param: str, tuple, list or dict
+    :returns: the current value(s) of the run-time parameter(s)
+    :rtype: str, list or dict
+    :raises TypeError: Invalid parameter type(s)
+    :raises ProgrammingError: Invalid parameter name(s)
+
+If the parameter is a string, the return value will also be a string
+that is the current setting of the run-time parameter with that name.
+
+You can get several parameters at once by passing a list or tuple of
+parameter names.  The return value will then be a corresponding list
+of parameter settings.  If you pass a dict as parameter instead, its
+values will be set to the parameter settings corresponding to its keys.
+
+By passing the special name `'all'` as the parameter, you can get a dict
+of all existing configuration parameters.
+
+.. versionadded:: 4.2
+
+.. method:: DB.set_parameter(self, parameter, [value], [local])
+
+    Set the value of run-time parameters
+
+    :param parameter: the run-time parameter(s) to set
+    :type param: string, tuple, list or dict
+    :param value: the value to set
+    :type param: str or None
+    :raises TypeError: Invalid parameter type(s)
+    :raises ValueError: Invalid value argument(s)
+    :raises ProgrammingError: Invalid parameter name(s) or values
+
+If the parameter and the value are strings, the run-time parameter
+will be set to that value.  If no value or *None* is passed as a value,
+then the run-time parameter will be restored to its default value.
+
+You can set several parameters at once by passing a list or tuple
+of parameter names, with a single value that all parameters should
+be set to or with a corresponding list or tuple of values.
+
+You can also pass a dict as parameters.  In this case, you should
+not pass a value, since the values will be taken from the dict.
+
+By passing the special name `'all'` as the parameter, you can reset
+all existing settable run-time parameters to their default values.
+
+If you set *local* to `True`, then the command takes effect for only the
+current transaction.  After :meth:`DB.commit` or :meth:`DB.rollback`,
+the session-level setting takes effect again.  Setting *local* to `True`
+will appear to have no effect if it is executed outside a transaction,
+since the transaction will end immediately.
+
+.. versionadded:: 4.2
+
 has_table_privilege -- check table privilege
 --------------------------------------------
 

Modified: branches/4.x/pg.py
==============================================================================
--- branches/4.x/pg.py  Thu Jan 14 15:26:30 2016        (r744)
+++ branches/4.x/pg.py  Thu Jan 14 20:16:23 2016        (r745)
@@ -517,6 +517,125 @@
         """Destroy a previously defined savepoint."""
         return self.query('RELEASE ' + name)
 
+    def get_parameter(self, parameter):
+        """Get the value of a run-time parameter.
+
+        If the parameter is a string, the return value will also be a string
+        that is the current setting of the run-time parameter with that name.
+
+        You can get several parameters at once by passing a list or tuple of
+        parameter names.  The return value will then be a corresponding list
+        of parameter settings.  If you pass a dict as parameter instead, its
+        values will be set to the parameter settings corresponding to its keys.
+
+        By passing the special name 'all' as the parameter, you can get a dict
+        of all existing configuration parameters.
+        """
+        if isinstance(parameter, basestring):
+            parameter = [parameter]
+            values = None
+        elif isinstance(parameter, (list, tuple)):
+            values = []
+        elif isinstance(parameter, dict):
+            values = parameter
+        else:
+            raise TypeError('The parameter must be a dict, list or string')
+        if not parameter:
+            raise TypeError('No parameter has been specified')
+        if isinstance(values, dict):
+            params = {}
+        else:
+            params = []
+        for key in parameter:
+            if isinstance(key, basestring):
+                param = key.strip().lower()
+            else:
+                param = None
+            if not param:
+                raise TypeError('Invalid parameter')
+            if param == 'all':
+                q = 'SHOW ALL'
+                values = self.db.query(q).getresult()
+                values = dict(value[:2] for value in values)
+                break
+            if isinstance(values, dict):
+                params[param] = key
+            else:
+                params.append(param)
+        else:
+            for param in params:
+                q = 'SHOW %s' % (param,)
+                value = self.db.query(q).getresult()[0][0]
+                if values is None:
+                    values = value
+                elif isinstance(values, list):
+                    values.append(value)
+                else:
+                    values[params[param]] = value
+        return values
+
+    def set_parameter(self, parameter, value=None, local=False):
+        """Set the value of a run-time parameter.
+
+        If the parameter and the value are strings, the run-time parameter
+        will be set to that value.  If no value or None is passed as a value,
+        then the run-time parameter will be restored to its default value.
+
+        You can set several parameters at once by passing a list or tuple
+        of parameter names, with a single value that all parameters should
+        be set to or with a corresponding list or tuple of values.
+
+        You can also pass a dict as parameters.  In this case, you should
+        not pass a value, since the values will be taken from the dict.
+
+        By passing the special name 'all' as the parameter, you can reset
+        all existing settable run-time parameters to their default values.
+
+        If you set local to True, then the command takes effect for only the
+        current transaction.  After commit() or rollback(), the session-level
+        setting takes effect again.  Setting local to True will appear to
+        have no effect if it is executed outside a transaction, since the
+        transaction will end immediately.
+        """
+        if isinstance(parameter, basestring):
+            parameter = {parameter: value}
+        elif isinstance(parameter, (list, tuple)):
+            if isinstance(value, (list, tuple)):
+                parameter = dict(zip(parameter, value))
+            else:
+                parameter = dict.fromkeys(parameter, value)
+        elif isinstance(parameter, dict):
+            if value is not None:
+                raise ValueError(
+                    'A value must not be set when parameter is a dictionary')
+        else:
+            raise TypeError('The parameter must be a dict, list or string')
+        if not parameter:
+            raise TypeError('No parameter has been specified')
+        params = {}
+        for key, value in parameter.items():
+            if isinstance(key, basestring):
+                param = key.strip().lower()
+            else:
+                param = None
+            if not param:
+                raise TypeError('Invalid parameter')
+            if param == 'all':
+                if value is not None:
+                    raise ValueError(
+                        "A value must ot be set when parameter is 'all'")
+                params = {'all': None}
+                break
+            params[param] = value
+        local = local and ' LOCAL' or ''
+        for param, value in params.items():
+            if value is None:
+                q = 'RESET%s %s' % (local, param)
+            else:
+                q = 'SET%s %s TO %s' % (local, param, value)
+            self._do_debug(q)
+            self.db.query(q)
+
     def query(self, qstr, *args):
         """Executes a SQL command string.
 

Modified: branches/4.x/tests/test_classic_dbwrapper.py
==============================================================================
--- branches/4.x/tests/test_classic_dbwrapper.py        Thu Jan 14 15:26:30 
2016        (r744)
+++ branches/4.x/tests/test_classic_dbwrapper.py        Thu Jan 14 20:16:23 
2016        (r745)
@@ -72,61 +72,31 @@
     def testAllDBAttributes(self):
         attributes = [
             'begin',
-            'cancel',
-            'clear',
-            'close',
-            'commit',
-            'db',
-            'dbname',
-            'debug',
-            'delete',
-            'end',
-            'endcopy',
-            'error',
-            'escape_bytea',
-            'escape_identifier',
-            'escape_literal',
-            'escape_string',
+            'cancel', 'clear', 'close', 'commit',
+            'db', 'dbname', 'debug', 'delete',
+            'end', 'endcopy', 'error',
+            'escape_bytea', 'escape_identifier',
+            'escape_literal', 'escape_string',
             'fileno',
-            'get',
-            'get_attnames',
-            'get_databases',
-            'get_notice_receiver',
-            'get_relations',
-            'get_tables',
-            'getline',
-            'getlo',
-            'getnotify',
-            'has_table_privilege',
-            'host',
-            'insert',
-            'inserttable',
-            'locreate',
-            'loimport',
+            'get', 'get_attnames', 'get_databases',
+            'get_notice_receiver', 'get_parameter',
+            'get_relations', 'get_tables',
+            'getline', 'getlo', 'getnotify',
+            'has_table_privilege', 'host',
+            'insert', 'inserttable',
+            'locreate', 'loimport',
             'notification_handler',
             'options',
-            'parameter',
-            'pkey',
-            'port',
-            'protocol_version',
-            'putline',
+            'parameter', 'pkey', 'port',
+            'protocol_version', 'putline',
             'query',
-            'release',
-            'reopen',
-            'reset',
-            'rollback',
-            'savepoint',
-            'server_version',
-            'set_notice_receiver',
-            'source',
-            'start',
-            'status',
-            'transaction',
-            'tty',
-            'unescape_bytea',
-            'update',
-            'use_regtypes',
-            'user',
+            'release', 'reopen', 'reset', 'rollback',
+            'savepoint', 'server_version',
+            'set_notice_receiver', 'set_parameter',
+            'source', 'start', 'status',
+            'transaction', 'tty',
+            'unescape_bytea', 'update',
+            'use_regtypes', 'user',
         ]
         if self.db.server_version < 90000:  # PostgreSQL < 9.0
             attributes.remove('escape_identifier')
@@ -288,8 +258,8 @@
         db.query("drop table if exists test cascade")
         db.query("create table test ("
             "i2 smallint, i4 integer, i8 bigint,"
-            "d numeric, f4 real, f8 double precision, m money, "
-            "v4 varchar(4), c4 char(4), t text)")
+            " d numeric, f4 real, f8 double precision, m money,"
+            " v4 varchar(4), c4 char(4), t text)")
         db.query("create or replace view test_view as"
             " select i4, v4 from test")
         db.close()
@@ -441,6 +411,150 @@
         self.assertEqual(f('ab\\c', 'text'), "'ab\\\\c'")
         self.assertEqual(f("a\\b'c", 'text'), "'a\\\\b''c'")
 
+    def testGetParameter(self):
+        f = self.db.get_parameter
+        self.assertRaises(TypeError, f)
+        self.assertRaises(TypeError, f, None)
+        self.assertRaises(TypeError, f, 42)
+        self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+        r = f('standard_conforming_strings')
+        self.assertEqual(r, 'on')
+        r = f('lc_monetary')
+        self.assertEqual(r, 'C')
+        r = f('datestyle')
+        self.assertEqual(r, 'ISO, YMD')
+        r = f('bytea_output')
+        self.assertEqual(r, 'hex')
+        r = f(('bytea_output', 'lc_monetary'))
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, ['hex', 'C'])
+        r = f(['standard_conforming_strings', 'datestyle', 'bytea_output'])
+        self.assertEqual(r, ['on', 'ISO, YMD', 'hex'])
+        s = dict.fromkeys(('bytea_output', 'lc_monetary'))
+        r = f(s)
+        self.assertIs(r, s)
+        self.assertEqual(r, {'bytea_output': 'hex', 'lc_monetary': 'C'})
+        s = dict.fromkeys(('Bytea_Output', 'LC_Monetary'))
+        r = f(s)
+        self.assertIs(r, s)
+        self.assertEqual(r, {'Bytea_Output': 'hex', 'LC_Monetary': 'C'})
+
+    def testGetParameterServerVersion(self):
+        r = self.db.get_parameter('server_version_num')
+        self.assertIsInstance(r, str)
+        s = self.db.server_version
+        self.assertIsInstance(s, int)
+        self.assertEqual(r, str(s))
+
+    def testGetParameterAll(self):
+        f = self.db.get_parameter
+        r = f('all')
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r['standard_conforming_strings'], 'on')
+        self.assertEqual(r['lc_monetary'], 'C')
+        self.assertEqual(r['DateStyle'], 'ISO, YMD')
+        self.assertEqual(r['bytea_output'], 'hex')
+
+    def testSetParameter(self):
+        f = self.db.set_parameter
+        g = self.db.get_parameter
+        self.assertRaises(TypeError, f)
+        self.assertRaises(TypeError, f, None)
+        self.assertRaises(TypeError, f, 42)
+        self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+        f('standard_conforming_strings', 'off')
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        f('datestyle', 'ISO, DMY')
+        self.assertEqual(g('datestyle'), 'ISO, DMY')
+        f(('standard_conforming_strings', 'datestyle'), ('on', 'ISO, YMD'))
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.assertEqual(g('datestyle'), 'ISO, YMD')
+        f(['standard_conforming_strings', 'datestyle'], ['off', 'ISO, DMY'])
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        self.assertEqual(g('datestyle'), 'ISO, DMY')
+        f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.assertEqual(g('datestyle'), 'ISO, YMD')
+        f(('default_with_oids', 'standard_conforming_strings'), 'off')
+        self.assertEqual(g('default_with_oids'), 'off')
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        f(['default_with_oids', 'standard_conforming_strings'], 'on')
+        self.assertEqual(g('default_with_oids'), 'on')
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+
+    def testResetParameter(self):
+        db = DB()
+        f = db.set_parameter
+        g = db.get_parameter
+        r = g('default_with_oids')
+        self.assertIn(r, ('on', 'off'))
+        dwi, not_dwi = r, r == 'on' and 'off' or 'on'
+        r = g('standard_conforming_strings')
+        self.assertIn(r, ('on', 'off'))
+        scs, not_scs = r, r == 'on' and 'off' or 'on'
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f('default_with_oids')
+        f('standard_conforming_strings', None)
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f(('default_with_oids', 'standard_conforming_strings'))
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f(['default_with_oids', 'standard_conforming_strings'], None)
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+
+    def testResetParameterAll(self):
+        db = DB()
+        f = db.set_parameter
+        self.assertRaises(ValueError, f, 'all', 0)
+        self.assertRaises(ValueError, f, 'all', 'off')
+        g = db.get_parameter
+        r = g('default_with_oids')
+        self.assertIn(r, ('on', 'off'))
+        dwi, not_dwi = r, r == 'on' and 'off' or 'on'
+        r = g('standard_conforming_strings')
+        self.assertIn(r, ('on', 'off'))
+        scs, not_scs = r, r == 'on' and 'off' or 'on'
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f('all')
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+
+    def testSetParameterLocal(self):
+        f = self.db.set_parameter
+        g = self.db.get_parameter
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.db.begin()
+        f('standard_conforming_strings', 'off', local=True)
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        self.db.end()
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+
+    def testSetParameterSession(self):
+        f = self.db.set_parameter
+        g = self.db.get_parameter
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.db.begin()
+        f('standard_conforming_strings', 'off', local=False)
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        self.db.end()
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+
     def testQuery(self):
         query = self.db.query
         query("drop table if exists test_table")
@@ -529,9 +643,9 @@
         query("create table pkeytest2 ("
             "c smallint, d smallint primary key)")
         query("create table pkeytest3 ("
-            "e smallint, f smallint, g smallint, "
-            "h smallint, i smallint, "
-            "primary key (f,h))")
+            "e smallint, f smallint, g smallint,"
+            " h smallint, i smallint,"
+            " primary key (f,h))")
         pkey = self.db.pkey
         self.assertRaises(KeyError, pkey, 'pkeytest0')
         self.assertEqual(pkey('pkeytest1'), 'b')
@@ -609,7 +723,7 @@
         self.assertNotIn('public.test', result)
         self.assertNotIn('public.test_view', result)
 
-    def testAttnames(self):
+    def testGetAttnames(self):
         self.assertRaises(pg.ProgrammingError,
             self.db.get_attnames, 'does_not_exist')
         self.assertRaises(pg.ProgrammingError,
@@ -617,12 +731,12 @@
         for table in ('attnames_test_table', 'test table for attnames'):
             self.db.query('drop table if exists "%s"' % table)
             self.db.query('create table "%s" ('
-                'a smallint, b integer, c bigint, '
-                'e numeric, f float, f2 double precision, m money, '
-                'x smallint, y smallint, z smallint, '
-                'Normal_NaMe smallint, "Special Name" smallint, '
-                't text, u char(2), v varchar(2), '
-                'primary key (y, u)) with oids' % table)
+                ' a smallint, b integer, c bigint,'
+                ' e numeric, f float, f2 double precision, m money,'
+                ' x smallint, y smallint, z smallint,'
+                ' Normal_NaMe smallint, "Special Name" smallint,'
+                ' t text, u char(2), v varchar(2),'
+                ' primary key (y, u)) with oids' % table)
             attributes = self.db.get_attnames(table)
             result = {'a': 'int', 'c': 'int', 'b': 'int',
                 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
@@ -747,13 +861,13 @@
             firstname="D'Arcy", nickname='Darcey', grade='A+'))
         try:
             get('test_students', "D' Arcy")
-        except pg.DatabaseError as error:
+        except pg.DatabaseError, error:
             self.assertEqual(str(error),
                 'No such record in public.test_students where firstname = '
                 "'D'' Arcy'")
         try:
             get('test_students', "Robert'); TRUNCATE TABLE test_students;--")
-        except pg.DatabaseError as error:
+        except pg.DatabaseError, error:
             self.assertEqual(str(error),
                 'No such record in public.test_students where firstname = '
                 "'Robert''); TRUNCATE TABLE test_students;--'")

Modified: branches/4.x/tests/test_tutorial.py
==============================================================================
--- branches/4.x/tests/test_tutorial.py Thu Jan 14 15:26:30 2016        (r744)
+++ branches/4.x/tests/test_tutorial.py Thu Jan 14 20:16:23 2016        (r745)
@@ -64,7 +64,7 @@
         more_fruits = 'cherimaya durian eggfruit fig grapefruit'.split()
         if namedtuple:
             data = list(enumerate(more_fruits, start=3))
-        else:  # Pyton < 2.6
+        else:  # Python < 2.6
             data = [(n + 3, name) for n, name in enumerate(more_fruits)]
         db.inserttable('fruits', data)
         q = db.query('select * from fruits')
@@ -87,12 +87,11 @@
         self.assertEqual(r[6], {'id': 7, 'name': 'grapefruit'})
         try:
             rows = r = q.namedresult()
-        except TypeError:  # Python < 2.6
-            self.assertIsNone(namedtuple)
-        else:
             self.assertIsInstance(r, list)
             self.assertIsInstance(r[0], tuple)
             self.assertEqual(rows[3].name, 'durian')
+        except (AttributeError, TypeError):  # Python < 2.6
+            self.assertIsNone(namedtuple)
         r = db.update('fruits', banana, name=banana['name'].capitalize())
         self.assertIsInstance(r, dict)
         self.assertEqual(r, {'id': 2, 'name': 'Banana'})

Modified: trunk/docs/contents/changelog.rst
==============================================================================
--- trunk/docs/contents/changelog.rst   Thu Jan 14 15:26:30 2016        (r744)
+++ trunk/docs/contents/changelog.rst   Thu Jan 14 20:16:23 2016        (r745)
@@ -42,10 +42,12 @@
 
 Version 4.2
 -----------
-- Set a better default for the user option "escaping-funcs".
 - The supported Python versions are 2.4 to 2.7.
 - PostgreSQL is supported in all versions from 8.3 to 9.5.
+- Set a better default for the user option "escaping-funcs".
 - Force build to compile with no errors.
+- New methods get_parameters() and set_parameters() in the classic interface
+  which can be used to get or set run-time parameters.
 - Fix decimal point handling.
 - Add option to return boolean values as bool objects.
 - Add option to return money values as string.

Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst       Thu Jan 14 15:26:30 2016        
(r744)
+++ trunk/docs/contents/pg/db_wrapper.rst       Thu Jan 14 20:16:23 2016        
(r745)
@@ -120,8 +120,7 @@
     Get the attribute names of a table
 
     :param str table: name of table
-    :returns: A dictionary -- the keys are the attribute names,
-     the values are the type names of the attributes.
+    :returns: a dictionary mapping attribute names to type names
 
 Given the name of a table, digs out the set of attribute names.
 
@@ -135,7 +134,6 @@
 You can get the regular types after enabling this by calling the
 :meth:`DB.use_regtypes` method.
 
-
 has_table_privilege -- check table privilege
 --------------------------------------------
 
@@ -152,6 +150,67 @@
 
 .. versionadded:: 4.0
 
+get/set_parameter -- get or set  run-time parameters
+----------------------------------------------------
+
+.. method:: DB.get_parameter(parameter)
+
+    Get the value of run-time parameters
+
+    :param parameter: the run-time parameter(s) to get
+    :type param: str, tuple, list or dict
+    :returns: the current value(s) of the run-time parameter(s)
+    :rtype: str, list or dict
+    :raises TypeError: Invalid parameter type(s)
+    :raises ProgrammingError: Invalid parameter name(s)
+
+If the parameter is a string, the return value will also be a string
+that is the current setting of the run-time parameter with that name.
+
+You can get several parameters at once by passing a list or tuple of
+parameter names.  The return value will then be a corresponding list
+of parameter settings.  If you pass a dict as parameter instead, its
+values will be set to the parameter settings corresponding to its keys.
+
+By passing the special name `'all'` as the parameter, you can get a dict
+of all existing configuration parameters.
+
+.. versionadded:: 4.2
+
+.. method:: DB.set_parameter(self, parameter, [value], [local])
+
+    Set the value of run-time parameters
+
+    :param parameter: the run-time parameter(s) to set
+    :type param: string, tuple, list or dict
+    :param value: the value to set
+    :type param: str or None
+    :raises TypeError: Invalid parameter type(s)
+    :raises ValueError: Invalid value argument(s)
+    :raises ProgrammingError: Invalid parameter name(s) or values
+
+If the parameter and the value are strings, the run-time parameter
+will be set to that value.  If no value or *None* is passed as a value,
+then the run-time parameter will be restored to its default value.
+
+You can set several parameters at once by passing a list or tuple
+of parameter names, with a single value that all parameters should
+be set to or with a corresponding list or tuple of values.
+
+You can also pass a dict as parameters.  In this case, you should
+not pass a value, since the values will be taken from the dict.
+
+By passing the special name `'all'` as the parameter, you can reset
+all existing settable run-time parameters to their default values.
+
+If you set *local* to `True`, then the command takes effect for only the
+current transaction.  After :meth:`DB.commit` or :meth:`DB.rollback`,
+the session-level setting takes effect again.  Setting *local* to `True`
+will appear to have no effect if it is executed outside a transaction,
+since the transaction will end immediately.
+
+.. versionadded:: 4.2
+
 begin/commit/rollback/savepoint/release -- transaction handling
 ---------------------------------------------------------------
 

Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Thu Jan 14 15:26:30 2016        (r744)
+++ trunk/pg.py Thu Jan 14 20:16:23 2016        (r745)
@@ -462,6 +462,118 @@
         """Destroy a previously defined savepoint."""
         return self.query('RELEASE ' + name)
 
+    def get_parameter(self, parameter):
+        """Get the value of a run-time parameter.
+
+        If the parameter is a string, the return value will also be a string
+        that is the current setting of the run-time parameter with that name.
+
+        You can get several parameters at once by passing a list or tuple of
+        parameter names.  The return value will then be a corresponding list
+        of parameter settings.  If you pass a dict as parameter instead, its
+        values will be set to the parameter settings corresponding to its keys.
+
+        By passing the special name 'all' as the parameter, you can get a dict
+        of all existing configuration parameters.
+        """
+        if isinstance(parameter, basestring):
+            parameter = [parameter]
+            values = None
+        elif isinstance(parameter, (list, tuple)):
+            values = []
+        elif isinstance(parameter, dict):
+            values = parameter
+        else:
+            raise TypeError('The parameter must be a dict, list or string')
+        if not parameter:
+            raise TypeError('No parameter has been specified')
+        params = {} if isinstance(values, dict) else []
+        for key in parameter:
+            param = key.strip().lower() if isinstance(
+                key, basestring) else None
+            if not param:
+                raise TypeError('Invalid parameter')
+            if param == 'all':
+                q = 'SHOW ALL'
+                values = self.db.query(q).getresult()
+                values = dict(value[:2] for value in values)
+                break
+            if isinstance(values, dict):
+                params[param] = key
+            else:
+                params.append(param)
+        else:
+            for param in params:
+                q = 'SHOW %s' % (param,)
+                value = self.db.query(q).getresult()[0][0]
+                if values is None:
+                    values = value
+                elif isinstance(values, list):
+                    values.append(value)
+                else:
+                    values[params[param]] = value
+        return values
+
+    def set_parameter(self, parameter, value=None, local=False):
+        """Set the value of a run-time parameter.
+
+        If the parameter and the value are strings, the run-time parameter
+        will be set to that value.  If no value or None is passed as a value,
+        then the run-time parameter will be restored to its default value.
+
+        You can set several parameters at once by passing a list or tuple
+        of parameter names, with a single value that all parameters should
+        be set to or with a corresponding list or tuple of values.
+
+        You can also pass a dict as parameters.  In this case, you should
+        not pass a value, since the values will be taken from the dict.
+
+        By passing the special name 'all' as the parameter, you can reset
+        all existing settable run-time parameters to their default values.
+
+        If you set local to True, then the command takes effect for only the
+        current transaction.  After commit() or rollback(), the session-level
+        setting takes effect again.  Setting local to True will appear to
+        have no effect if it is executed outside a transaction, since the
+        transaction will end immediately.
+        """
+        if isinstance(parameter, basestring):
+            parameter = {parameter: value}
+        elif isinstance(parameter, (list, tuple)):
+            if isinstance(value, (list, tuple)):
+                parameter = dict(zip(parameter, value))
+            else:
+                parameter = dict.fromkeys(parameter, value)
+        elif isinstance(parameter, dict):
+            if value is not None:
+                raise ValueError(
+                    'A value must not be set when parameter is a dictionary')
+        else:
+            raise TypeError('The parameter must be a dict, list or string')
+        if not parameter:
+            raise TypeError('No parameter has been specified')
+        params = {}
+        for key, value in parameter.items():
+            param = key.strip().lower() if isinstance(
+                key, basestring) else None
+            if not param:
+                raise TypeError('Invalid parameter')
+            if param == 'all':
+                if value is not None:
+                    raise ValueError(
+                        "A value must ot be set when parameter is 'all'")
+                params = {'all': None}
+                break
+            params[param] = value
+        local = ' LOCAL' if local else ''
+        for param, value in params.items():
+            if value is None:
+                q = 'RESET%s %s' % (local, param)
+            else:
+                q = 'SET%s %s TO %s' % (local, param, value)
+            self._do_debug(q)
+            self.db.query(q)
+
     def query(self, qstr, *args):
         """Execute a SQL command string.
 

Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py       Thu Jan 14 15:26:30 2016        
(r744)
+++ trunk/tests/test_classic_dbwrapper.py       Thu Jan 14 20:16:23 2016        
(r745)
@@ -85,61 +85,31 @@
     def testAllDBAttributes(self):
         attributes = [
             'begin',
-            'cancel',
-            'clear',
-            'close',
-            'commit',
-            'db',
-            'dbname',
-            'debug',
-            'delete',
-            'end',
-            'endcopy',
-            'error',
-            'escape_bytea',
-            'escape_identifier',
-            'escape_literal',
-            'escape_string',
+            'cancel', 'clear', 'close', 'commit',
+            'db', 'dbname', 'debug', 'delete',
+            'end', 'endcopy', 'error',
+            'escape_bytea', 'escape_identifier',
+            'escape_literal', 'escape_string',
             'fileno',
-            'get',
-            'get_attnames',
-            'get_databases',
-            'get_notice_receiver',
-            'get_relations',
-            'get_tables',
-            'getline',
-            'getlo',
-            'getnotify',
-            'has_table_privilege',
-            'host',
-            'insert',
-            'inserttable',
-            'locreate',
-            'loimport',
+            'get', 'get_attnames', 'get_databases',
+            'get_notice_receiver', 'get_parameter',
+            'get_relations', 'get_tables',
+            'getline', 'getlo', 'getnotify',
+            'has_table_privilege', 'host',
+            'insert', 'inserttable',
+            'locreate', 'loimport',
             'notification_handler',
             'options',
-            'parameter',
-            'pkey',
-            'port',
-            'protocol_version',
-            'putline',
+            'parameter', 'pkey', 'port',
+            'protocol_version', 'putline',
             'query',
-            'release',
-            'reopen',
-            'reset',
-            'rollback',
-            'savepoint',
-            'server_version',
-            'set_notice_receiver',
-            'source',
-            'start',
-            'status',
+            'release', 'reopen', 'reset', 'rollback',
+            'savepoint', 'server_version',
+            'set_notice_receiver', 'set_parameter',
+            'source', 'start', 'status',
             'transaction',
-            'unescape_bytea',
-            'update',
-            'upsert',
-            'use_regtypes',
-            'user',
+            'unescape_bytea', 'update', 'upsert',
+            'use_regtypes', 'user',
         ]
         db_attributes = [a for a in dir(self.db)
             if not a.startswith('_')]
@@ -287,8 +257,8 @@
         db.query("drop table if exists test cascade")
         db.query("create table test ("
             "i2 smallint, i4 integer, i8 bigint,"
-            "d numeric, f4 real, f8 double precision, m money, "
-            "v4 varchar(4), c4 char(4), t text)")
+            " d numeric, f4 real, f8 double precision, m money,"
+            " v4 varchar(4), c4 char(4), t text)")
         db.query("create or replace view test_view as"
             " select i4, v4 from test")
         db.close()
@@ -412,90 +382,149 @@
             b'\\x746861742773206be47365')
         self.assertEqual(f(r'\\x4f007073ff21'), b'\\x4f007073ff21')
 
-    def testGetAttnames(self):
-        get_attnames = self.db.get_attnames
-        query = self.db.query
-        query("drop table if exists test_table")
-        self.addCleanup(query, "drop table test_table")
-        query("create table test_table("
-            " n int, alpha smallint, beta bool,"
-            " gamma char(5), tau text, v varchar(3))")
-        r = get_attnames("test_table")
-        self.assertIsInstance(r, dict)
-        self.assertEquals(r, dict(
-            n='int', alpha='int', beta='bool',
-            gamma='text', tau='text', v='text'))
+    def testGetParameter(self):
+        f = self.db.get_parameter
+        self.assertRaises(TypeError, f)
+        self.assertRaises(TypeError, f, None)
+        self.assertRaises(TypeError, f, 42)
+        self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+        r = f('standard_conforming_strings')
+        self.assertEqual(r, 'on')
+        r = f('lc_monetary')
+        self.assertEqual(r, 'C')
+        r = f('datestyle')
+        self.assertEqual(r, 'ISO, YMD')
+        r = f('bytea_output')
+        self.assertEqual(r, 'hex')
+        r = f(('bytea_output', 'lc_monetary'))
+        self.assertIsInstance(r, list)
+        self.assertEqual(r, ['hex', 'C'])
+        r = f(['standard_conforming_strings', 'datestyle', 'bytea_output'])
+        self.assertEqual(r, ['on', 'ISO, YMD', 'hex'])
+        s = dict.fromkeys(('bytea_output', 'lc_monetary'))
+        r = f(s)
+        self.assertIs(r, s)
+        self.assertEqual(r, {'bytea_output': 'hex', 'lc_monetary': 'C'})
+        s = dict.fromkeys(('Bytea_Output', 'LC_Monetary'))
+        r = f(s)
+        self.assertIs(r, s)
+        self.assertEqual(r, {'Bytea_Output': 'hex', 'LC_Monetary': 'C'})
 
-    def testGetAttnamesWithQuotes(self):
-        get_attnames = self.db.get_attnames
-        query = self.db.query
-        table = 'test table for get_attnames()'
-        query('drop table if exists "%s"' % table)
-        self.addCleanup(query, 'drop table "%s"' % table)
-        query('create table "%s"('
-            '"Prime!" smallint,'
-            '"much space" integer, "Questions?" text)' % table)
-        r = get_attnames(table)
+    def testGetParameterServerVersion(self):
+        r = self.db.get_parameter('server_version_num')
+        self.assertIsInstance(r, str)
+        s = self.db.server_version
+        self.assertIsInstance(s, int)
+        self.assertEqual(r, str(s))
+
+    def testGetParameterAll(self):
+        f = self.db.get_parameter
+        r = f('all')
         self.assertIsInstance(r, dict)
-        self.assertEquals(r, {
-            'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
+        self.assertEqual(r['standard_conforming_strings'], 'on')
+        self.assertEqual(r['lc_monetary'], 'C')
+        self.assertEqual(r['DateStyle'], 'ISO, YMD')
+        self.assertEqual(r['bytea_output'], 'hex')
+
+    def testSetParameter(self):
+        f = self.db.set_parameter
+        g = self.db.get_parameter
+        self.assertRaises(TypeError, f)
+        self.assertRaises(TypeError, f, None)
+        self.assertRaises(TypeError, f, 42)
+        self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
+        f('standard_conforming_strings', 'off')
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        f('datestyle', 'ISO, DMY')
+        self.assertEqual(g('datestyle'), 'ISO, DMY')
+        f(('standard_conforming_strings', 'datestyle'), ('on', 'ISO, YMD'))
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.assertEqual(g('datestyle'), 'ISO, YMD')
+        f(['standard_conforming_strings', 'datestyle'], ['off', 'ISO, DMY'])
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        self.assertEqual(g('datestyle'), 'ISO, DMY')
+        f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.assertEqual(g('datestyle'), 'ISO, YMD')
+        f(('default_with_oids', 'standard_conforming_strings'), 'off')
+        self.assertEqual(g('default_with_oids'), 'off')
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        f(['default_with_oids', 'standard_conforming_strings'], 'on')
+        self.assertEqual(g('default_with_oids'), 'on')
+        self.assertEqual(g('standard_conforming_strings'), 'on')
 
-    def testGetAttnamesWithRegtypes(self):
-        get_attnames = self.db.get_attnames
-        query = self.db.query
-        query("drop table if exists test_table")
-        self.addCleanup(query, "drop table test_table")
-        query("create table test_table("
-            " n int, alpha smallint, beta bool,"
-            " gamma char(5), tau text, v varchar(3))")
-        self.db.use_regtypes(True)
-        try:
-            r = get_attnames("test_table")
-            self.assertIsInstance(r, dict)
-        finally:
-            self.db.use_regtypes(False)
-        self.assertEquals(r, dict(
-            n='integer', alpha='smallint', beta='boolean',
-            gamma='character', tau='text', v='character varying'))
+    def testResetParameter(self):
+        db = DB()
+        f = db.set_parameter
+        g = db.get_parameter
+        r = g('default_with_oids')
+        self.assertIn(r, ('on', 'off'))
+        dwi, not_dwi = r, 'off' if r == 'on' else 'on'
+        r = g('standard_conforming_strings')
+        self.assertIn(r, ('on', 'off'))
+        scs, not_scs = r, 'off' if r == 'on' else 'on'
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f('default_with_oids')
+        f('standard_conforming_strings', None)
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f(('default_with_oids', 'standard_conforming_strings'))
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f(['default_with_oids', 'standard_conforming_strings'], None)
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
 
-    def testGetAttnamesIsCached(self):
-        get_attnames = self.db.get_attnames
-        query = self.db.query
-        query("drop table if exists test_table")
-        self.addCleanup(query, "drop table if exists test_table")
-        query("create table test_table(col int)")
-        r = get_attnames("test_table")
-        self.assertIsInstance(r, dict)
-        self.assertEquals(r, dict(col='int'))
-        query("drop table test_table")
-        query("create table test_table(col text)")
-        r = get_attnames("test_table")
-        self.assertEquals(r, dict(col='int'))
-        r = get_attnames("test_table", flush=True)
-        self.assertEquals(r, dict(col='text'))
-        query("drop table test_table")
-        r = get_attnames("test_table")
-        self.assertEquals(r, dict(col='text'))
-        self.assertRaises(pg.ProgrammingError,
-            get_attnames, "test_table", flush=True)
+    def testResetParameterAll(self):
+        db = DB()
+        f = db.set_parameter
+        self.assertRaises(ValueError, f, 'all', 0)
+        self.assertRaises(ValueError, f, 'all', 'off')
+        g = db.get_parameter
+        r = g('default_with_oids')
+        self.assertIn(r, ('on', 'off'))
+        dwi, not_dwi = r, 'off' if r == 'on' else 'on'
+        r = g('standard_conforming_strings')
+        self.assertIn(r, ('on', 'off'))
+        scs, not_scs = r, 'off' if r == 'on' else 'on'
+        f('default_with_oids', not_dwi)
+        f('standard_conforming_strings', not_scs)
+        self.assertEqual(g('default_with_oids'), not_dwi)
+        self.assertEqual(g('standard_conforming_strings'), not_scs)
+        f('all')
+        self.assertEqual(g('default_with_oids'), dwi)
+        self.assertEqual(g('standard_conforming_strings'), scs)
+
+    def testSetParameterLocal(self):
+        f = self.db.set_parameter
+        g = self.db.get_parameter
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.db.begin()
+        f('standard_conforming_strings', 'off', local=True)
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        self.db.end()
+        self.assertEqual(g('standard_conforming_strings'), 'on')
 
-    def testGetAttnamesIsOrdered(self):
-        get_attnames = self.db.get_attnames
-        query = self.db.query
-        query("drop table if exists test_table")
-        self.addCleanup(query, "drop table test_table")
-        query("create table test_table("
-            " n int, alpha smallint, v varchar(3),"
-            " gamma char(5), tau text, beta bool)")
-        r = get_attnames("test_table")
-        self.assertIsInstance(r, OrderedDict)
-        self.assertEquals(r, OrderedDict([
-            ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
-            ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
-        if OrderedDict is dict:
-            self.skipTest('OrderedDict is not supported')
-        r = ' '.join(list(r.keys()))
-        self.assertEquals(r, 'n alpha v gamma tau beta')
+    def testSetParameterSession(self):
+        f = self.db.set_parameter
+        g = self.db.get_parameter
+        self.assertEqual(g('standard_conforming_strings'), 'on')
+        self.db.begin()
+        f('standard_conforming_strings', 'off', local=False)
+        self.assertEqual(g('standard_conforming_strings'), 'off')
+        self.db.end()
+        self.assertEqual(g('standard_conforming_strings'), 'off')
 
     def testQuery(self):
         query = self.db.query
@@ -588,19 +617,19 @@
             query('create table "%s2" ('
                 "c smallint, d smallint primary key)" % t)
             query('create table "%s3" ('
-                "e smallint, f smallint, g smallint, "
-                "h smallint, i smallint, "
-                "primary key (f, h))" % t)
+                "e smallint, f smallint, g smallint,"
+                " h smallint, i smallint,"
+                " primary key (f, h))" % t)
             query('create table "%s4" ('
                 "more_than_one_letter varchar primary key)" % t)
             query('create table "%s5" ('
                 '"with space" date primary key)' % t)
             query('create table "%s6" ('
-                'a_very_long_column_name varchar, '
-                '"with space" date, '
-                '"42" int, '
-                "primary key (a_very_long_column_name, "
-                '"with space", "42"))' % t)
+                'a_very_long_column_name varchar,'
+                ' "with space" date,'
+                ' "42" int,'
+                " primary key (a_very_long_column_name,"
+                ' "with space", "42"))' % t)
             self.assertRaises(KeyError, pkey, '%s0' % t)
             self.assertEqual(pkey('%s1' % t), 'b')
             self.assertEqual(pkey('%s2' % t), 'd')
@@ -686,28 +715,111 @@
         self.assertNotIn('public.test', result)
         self.assertNotIn('public.test_view', result)
 
-    def testAttnames(self):
+    def testGetAttnames(self):
+        get_attnames = self.db.get_attnames
         self.assertRaises(pg.ProgrammingError,
             self.db.get_attnames, 'does_not_exist')
         self.assertRaises(pg.ProgrammingError,
             self.db.get_attnames, 'has.too.many.dots')
-        for table in ('attnames_test_table', 'test table for attnames'):
-            self.db.query('drop table if exists "%s"' % table)
-            self.addCleanup(self.db.query, 'drop table "%s"' % table)
-            self.db.query('create table "%s" ('
-                'a smallint, b integer, c bigint, '
-                'e numeric, f float, f2 double precision, m money, '
-                'x smallint, y smallint, z smallint, '
-                'Normal_NaMe smallint, "Special Name" smallint, '
-                't text, u char(2), v varchar(2), '
-                'primary key (y, u)) with oids' % table)
-            attributes = self.db.get_attnames(table)
-            result = {'a': 'int', 'c': 'int', 'b': 'int',
-                'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
-                'normal_name': 'int', 'Special Name': 'int',
-                'u': 'text', 't': 'text', 'v': 'text',
-                'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'}
-            self.assertEqual(attributes, result)
+        query = self.db.query
+        query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
+        query("create table test_table("
+            " n int, alpha smallint, beta bool,"
+            " gamma char(5), tau text, v varchar(3))")
+        r = get_attnames('test_table')
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r, dict(
+            n='int', alpha='int', beta='bool',
+            gamma='text', tau='text', v='text'))
+
+    def testGetAttnamesWithQuotes(self):
+        get_attnames = self.db.get_attnames
+        query = self.db.query
+        table = 'test table for get_attnames()'
+        query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
+        query('create table "%s"('
+            '"Prime!" smallint,'
+            ' "much space" integer, "Questions?" text)' % table)
+        r = get_attnames(table)
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r, {
+            'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
+        table = 'yet another test table for get_attnames()'
+        query('drop table if exists "%s"' % table)
+        self.addCleanup(query, 'drop table "%s"' % table)
+        self.db.query('create table "%s" ('
+            'a smallint, b integer, c bigint,'
+            ' e numeric, f float, f2 double precision, m money,'
+            ' x smallint, y smallint, z smallint,'
+            ' Normal_NaMe smallint, "Special Name" smallint,'
+            ' t text, u char(2), v varchar(2),'
+            ' primary key (y, u)) with oids' % table)
+        r = get_attnames(table)
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r, {'a': 'int', 'c': 'int', 'b': 'int',
+            'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
+            'normal_name': 'int', 'Special Name': 'int',
+            'u': 'text', 't': 'text', 'v': 'text',
+            'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
+
+    def testGetAttnamesWithRegtypes(self):
+        get_attnames = self.db.get_attnames
+        query = self.db.query
+        query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
+        query("create table test_table("
+            " n int, alpha smallint, beta bool,"
+            " gamma char(5), tau text, v varchar(3))")
+        self.db.use_regtypes(True)
+        try:
+            r = get_attnames("test_table")
+            self.assertIsInstance(r, dict)
+        finally:
+            self.db.use_regtypes(False)
+        self.assertEqual(r, dict(
+            n='integer', alpha='smallint', beta='boolean',
+            gamma='character', tau='text', v='character varying'))
+
+    def testGetAttnamesIsCached(self):
+        get_attnames = self.db.get_attnames
+        query = self.db.query
+        query("drop table if exists test_table")
+        self.addCleanup(query, "drop table if exists test_table")
+        query("create table test_table(col int)")
+        r = get_attnames("test_table")
+        self.assertIsInstance(r, dict)
+        self.assertEqual(r, dict(col='int'))
+        query("drop table test_table")
+        query("create table test_table(col text)")
+        r = get_attnames("test_table")
+        self.assertEqual(r, dict(col='int'))
+        r = get_attnames("test_table", flush=True)
+        self.assertEqual(r, dict(col='text'))
+        query("drop table test_table")
+        r = get_attnames("test_table")
+        self.assertEqual(r, dict(col='text'))
+        self.assertRaises(pg.ProgrammingError,
+            get_attnames, "test_table", flush=True)
+
+    def testGetAttnamesIsOrdered(self):
+        get_attnames = self.db.get_attnames
+        query = self.db.query
+        query("drop table if exists test_table")
+        self.addCleanup(query, "drop table test_table")
+        query("create table test_table("
+            " n int, alpha smallint, v varchar(3),"
+            " gamma char(5), tau text, beta bool)")
+        r = get_attnames("test_table")
+        self.assertIsInstance(r, OrderedDict)
+        self.assertEqual(r, OrderedDict([
+            ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
+            ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
+        if OrderedDict is dict:
+            self.skipTest('OrderedDict is not supported')
+        r = ' '.join(list(r.keys()))
+        self.assertEqual(r, 'n alpha v gamma tau beta')
 
     def testHasTablePrivilege(self):
         can = self.db.has_table_privilege
@@ -800,7 +912,7 @@
         self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
-            '"much space" integer, "Questions?" text)' % table)
+            ' "much space" integer, "Questions?" text)' % table)
         query('insert into "%s"'
               " values(17, 1001, 'No!')" % table)
         r = get(table, 17)
@@ -968,7 +1080,7 @@
         self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
-            '"much space" integer, "Questions?" text)' % table)
+            ' "much space" integer, "Questions?" text)' % table)
         r = {'Prime!': 11, 'much space': 2002, 'Questions?': 'What?'}
         r = insert(table, r)
         self.assertIsInstance(r, dict)
@@ -1059,7 +1171,7 @@
         self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
-            '"much space" integer, "Questions?" text)' % table)
+            ' "much space" integer, "Questions?" text)' % table)
         query('insert into "%s"'
               " values(13, 3003, 'Why!')" % table)
         r = {'Prime!': 13, 'much space': 7007, 'Questions?': 'When?'}
@@ -1221,7 +1333,7 @@
         self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
-            '"much space" integer, "Questions?" text)' % table)
+            ' "much space" integer, "Questions?" text)' % table)
         s = {'Prime!': 31, 'much space': 9009, 'Questions?': 'Yes.'}
         try:
             r = upsert(table, s)
@@ -1274,7 +1386,7 @@
         self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
-            '"much space" integer, "Questions?" text)' % table)
+            ' "much space" integer, "Questions?" text)' % table)
         r = clear(table)
         self.assertIsInstance(r, dict)
         self.assertEqual(r['Prime!'], 0)
@@ -1366,7 +1478,7 @@
         self.addCleanup(query, 'drop table "%s"' % table)
         query('create table "%s" ('
             '"Prime!" smallint primary key,'
-            '"much space" integer, "Questions?" text)' % table)
+            ' "much space" integer, "Questions?" text)' % table)
         query('insert into "%s"'
               " values(19, 5005, 'Yes!')" % table)
         r = {'Prime!': 17}
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to