Author: cito
Date: Tue Jan 12 16:29:07 2016
New Revision: 729

Log:
Simplify caching and handling of class names

The caches now use the class names as keys as they are passed in.
We do not automatically calculate the qualified name any more,
since this causes too much overhead. Also, we fill the pkey cache
not pro-actively with tables from all possible schemes any more.

Most of the internal auxiliary functions for handling class names
could be discarded by making good use of quote_ident and reglass.

Modified:
   trunk/docs/contents/changelog.rst
   trunk/docs/contents/pg/db_wrapper.rst
   trunk/pg.py
   trunk/tests/test_classic.py
   trunk/tests/test_classic_dbwrapper.py
   trunk/tests/test_classic_functions.py

Modified: trunk/docs/contents/changelog.rst
==============================================================================
--- trunk/docs/contents/changelog.rst   Tue Jan 12 16:21:46 2016        (r728)
+++ trunk/docs/contents/changelog.rst   Tue Jan 12 16:29:07 2016        (r729)
@@ -27,6 +27,17 @@
   colnames and coltypes attributes, which are not part of DB-API 2 though.
 - The tty parameter and attribute of database connections has been
   removed since it is not supported any more since PostgreSQL 7.4.
+- The table name that is affixed to the name of the OID column returned
+  by the get() method of the classic interface will not automatically
+  be fully qualified any more. This reduces overhead from the interface,
+  but it means you must always write the table name in the same way when
+  you call the methods using it and you are using tables with OIDs.
+  Note that OIDs are considered deprecated anyway, and they are not created
+  by default any more in PostgreSQL 8.1 and later.
+- Simplified the internal caching and mechanisms for automatic quoting
+  of class names in the classic interface, these things should now both
+  perform better and use less memory.
+
 
 Version 4.2
 -----------

Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst       Tue Jan 12 16:21:46 2016        
(r728)
+++ trunk/docs/contents/pg/db_wrapper.rst       Tue Jan 12 16:29:07 2016        
(r729)
@@ -63,10 +63,11 @@
     :param str table: name of table
     :returns: Name of the field which is the primary key of the table
     :rtype: str
+    :raises KeyError: the table does not have a primary key
 
 This method returns the primary key of a table. For composite primary
-keys, the return value will be a frozenset. Note that this raises an
-exception if the table does not have a primary key.
+keys, the return value will be a frozenset. Note that this raises a
+KeyError if the table does not have a primary key.
 
 get_databases -- get list of databases in the system
 ----------------------------------------------------
@@ -161,7 +162,7 @@
 For a composite key, *keyname* can also be a sequence of key names.
 The OID is also put into the dictionary if the table has one, but in
 order to allow the caller to work with multiple tables, it is munged
-as ``oid(schema.table)``.
+as ``oid(table)``.
 
 insert -- insert a row into a database table
 --------------------------------------------

Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Tue Jan 12 16:21:46 2016        (r728)
+++ trunk/pg.py Tue Jan 12 16:29:07 2016        (r729)
@@ -49,68 +49,54 @@
 
 # Auxiliary functions that are independent from a DB connection:
 
-def _is_quoted(s):
-    """Check whether this string is a quoted identifier."""
-    s = s.replace('_', 'a')
-    return not s.isalnum() or s[:1].isdigit() or s != s.lower()
-
-
-def _is_unquoted(s):
-    """Check whether this string is an unquoted identifier."""
-    s = s.replace('_', 'a')
-    return s.isalnum() and not s[:1].isdigit()
-
-
-def _split_first_part(s):
-    """Split the first part of a dot separated string."""
-    s = s.lstrip()
-    if s[:1] == '"':
-        p = []
-        s = s.split('"', 3)[1:]
-        p.append(s[0])
-        while len(s) == 3 and s[1] == '':
-            p.append('"')
-            s = s[2].split('"', 2)
-            p.append(s[0])
-        p = [''.join(p)]
-        s = '"'.join(s[1:]).lstrip()
-        if s:
-            if s[:0] == '.':
-                p.append(s[1:])
-            else:
-                s = _split_first_part(s)
-                p[0] += s[0]
-                if len(s) > 1:
-                    p.append(s[1])
-    else:
-        p = s.split('.', 1)
-        s = p[0].rstrip()
-        if _is_unquoted(s):
-            s = s.lower()
-        p[0] = s
-    return p
-
-
-def _split_parts(s):
-    """Split all parts of a dot separated string."""
-    q = []
-    while s:
-        s = _split_first_part(s)
-        q.append(s[0])
-        if len(s) < 2:
-            break
-        s = s[1]
-    return q
-
-
-def _join_parts(s):
-    """Join all parts of a dot separated string."""
-    return '.'.join(['"%s"' % p if _is_quoted(p) else p for p in s])
+def _quote_class_name(cl):
+    """Quote a class name.
+
+    Class names are always quoted unless they contain a dot.
+    In this ambiguous case quotes must be added manually.
+
+    """
+    if '.' not in cl:
+        cl = '"%s"' % cl
+    return cl
+
+
+def _quote_class_param(cl, param):
+    """Quote parameter representing a class name.
 
+    The parameter is automatically quoted unless the class name contains a dot.
+    In this ambiguous case quotes must be added manually.
 
-def _oid_key(qcl):
+    """
+    if isinstance(param, int):
+        param = "$%d" % param
+    if '.' not in cl:
+        param = 'quote_ident(%s)' % (param,)
+    return param
+
+
+def _oid_key(cl):
     """Build oid key from qualified class name."""
-    return 'oid(%s)' % qcl
+    return 'oid(%s)' % cl
+
+
+def _simpletype(typ):
+    """Determine a simplified name a pg_type name."""
+    if typ.startswith('bool'):
+        return 'bool'
+    if typ.startswith(('abstime', 'date', 'interval', 'timestamp')):
+        return 'date'
+    if typ.startswith(('cid', 'oid', 'int', 'xid')):
+        return 'int'
+    if typ.startswith('float'):
+        return 'float'
+    if typ.startswith('numeric'):
+        return 'num'
+    if typ.startswith('money'):
+        return 'money'
+    if typ.startswith('bytea'):
+        return 'bytea'
+    return 'text'
 
 
 def _namedresult(q):
@@ -413,50 +399,6 @@
             quote_func = self._quote_funcs['text']
         return quote_func(self, d)
 
-    def _split_schema(self, cl):
-        """Return schema and name of object separately.
-
-        This auxiliary function splits off the namespace (schema)
-        belonging to the class with the name cl. If the class name
-        is not qualified, the function is able to determine the schema
-        of the class, taking into account the current search path.
-
-        """
-        s = _split_parts(cl)
-        if len(s) > 1:  # name already qualified?
-            # should be database.schema.table or schema.table
-            if len(s) > 3:
-                raise _prg_error('Too many dots in class name %s' % cl)
-            schema, cl = s[-2:]
-        else:
-            cl = s[0]
-            # determine search path
-            q = 'SELECT current_schemas(TRUE)'
-            schemas = self.db.query(q).getresult()[0][0][1:-1].split(',')
-            if schemas:  # non-empty path
-                # search schema for this object in the current search path
-                # (we could also use unnest with ordinality here to spare
-                # one query, but this is only possible since PostgreSQL 9.4)
-                q = ' UNION '.join(
-                    ["SELECT %d::integer AS n, '%s'::name AS nspname"
-                        % s for s in enumerate(schemas)])
-                q = ("SELECT nspname FROM pg_class r"
-                    " JOIN pg_namespace s ON r.relnamespace = s.oid"
-                    " JOIN (%s) AS p USING (nspname)"
-                    " WHERE r.relname = $1 ORDER BY n LIMIT 1" % q)
-                schema = self.db.query(q, (cl,)).getresult()
-                if schema:  # schema found
-                    schema = schema[0][0]
-                else:  # object not found in current search path
-                    schema = 'public'
-            else:  # empty path
-                schema = 'public'
-        return schema, cl
-
-    def _add_schema(self, cl):
-        """Ensure that the class name is prefixed with a schema name."""
-        return _join_parts(self._split_schema(cl))
-
     # Public methods
 
     # escape_string and escape_bytea exist as methods,
@@ -558,56 +500,39 @@
         self._do_debug(qstr)
         return self.db.query(qstr, args)
 
-    def pkey(self, cl, newpkey=None):
+    def pkey(self, cl, flush=False):
         """This method gets or sets the primary key of a class.
 
         Composite primary keys are represented as frozensets. Note that
-        this raises an exception if the table does not have a primary key.
+        this raises a KeyError if the table does not have a primary key.
 
-        If newpkey is set and is not a dictionary then set that
-        value as the primary key of the class.  If it is a dictionary
-        then replace the internal cache for primary keys with a copy of it.
-
-        """
-        add_schema = self._add_schema
-
-        # First see if the caller is supplying a dictionary
-        if isinstance(newpkey, dict):
-            # make sure that all classes have a namespace
-            self._pkeys = dict((add_schema(cl), pkey)
-                for cl, pkey in newpkey.items())
-            return self._pkeys
-
-        qcl = add_schema(cl)  # build fully qualified class name
-        # Check if the caller is supplying a new primary key for the class
-        if newpkey:
-            self._pkeys[qcl] = newpkey
-            return newpkey
-
-        # Get all the primary keys at once
-        if qcl not in self._pkeys:
-            # if not found, check again in case it was added after we started
-            q = ("SELECT s.nspname, r.relname, a.attname"
-                " FROM pg_class r"
-                " JOIN pg_namespace s ON s.oid = r.relnamespace"
-                " AND s.nspname NOT SIMILAR"
-                " TO 'pg/_%|information/_schema' ESCAPE '/'"
-                " JOIN pg_attribute a ON a.attrelid = r.oid"
+        If flush is set then the internal cache for primary keys will
+        be flushed. This may be necessary after the database schema or
+        the search path has been changed.
+
+        """
+        pkeys = self._pkeys
+        if flush:
+            pkeys.clear()
+            self._do_debug('pkey cache has been flushed')
+        try:  # cache lookup
+            pkey = pkeys[cl]
+        except KeyError:  # cache miss, check the database
+            q = ("SELECT a.attname FROM pg_index i"
+                " JOIN pg_attribute a ON a.attrelid = i.indrelid"
+                " AND a.attnum = ANY(i.indkey)"
                 " AND NOT a.attisdropped"
-                " JOIN pg_index i ON i.indrelid = r.oid"
-                " AND i.indisprimary AND a.attnum = ANY (i.indkey)"
-                " ORDER BY 1,2")
-            rows = self.db.query(q).getresult()
-            pkeys = {}
-            for cl, group in groupby(rows, lambda row: row[:2]):
-                cl = _join_parts(cl)
-                pkey = [row[2] for row in group]
-                pkeys[cl] = frozenset(pkey) if len(pkey) > 1 else pkey[0]
-            self._do_debug(pkeys)
-            self._pkeys = pkeys
-
-        # will raise an exception if primary key doesn't exist
-        return self._pkeys[qcl]
+                " WHERE i.indrelid=%s::regclass"
+                " AND i.indisprimary" % _quote_class_param(cl, 1))
+            pkey = self.db.query(q, (cl,)).getresult()
+            if not pkey:
+                raise KeyError('Class %s has no primary key' % cl)
+            if len(pkey) > 1:
+                pkey = frozenset([k[0] for k in pkey])
+            else:
+                pkey = pkey[0][0]
+            pkeys[cl] = pkey  # cache it
+        return pkey
 
     def get_databases(self):
         """Get list of databases in the system."""
@@ -624,23 +549,24 @@
         """
         where = " AND r.relkind IN (%s)" % ','.join(
             ["'%s'" % k for k in kinds]) if kinds else ''
-        q = ("SELECT s.nspname, r.relname"
+        q = ("SELECT quote_ident(s.nspname)||'.'||quote_ident(r.relname)"
             " FROM pg_class r"
             " JOIN pg_namespace s ON s.oid = r.relnamespace"
             " WHERE s.nspname NOT SIMILAR"
             " TO 'pg/_%%|information/_schema' ESCAPE '/' %s"
-            " ORDER BY 1, 2") % where
-        return [_join_parts(r) for r in self.db.query(q).getresult()]
+            " ORDER BY s.nspname, r.relname") % where
+        return [r[0] for r in self.db.query(q).getresult()]
 
     def get_tables(self):
         """Return list of tables in connected database."""
         return self.get_relations('r')
 
-    def get_attnames(self, cl, newattnames=None):
+    def get_attnames(self, cl, flush=False):
         """Given the name of a table, digs out the set of attribute names.
 
         Returns a dictionary of attribute names (the names are the keys,
         the values are the names of the attributes' types).
+
         If the optional newattnames exists, it must be a dictionary and
         will become the new attribute names dictionary.
 
@@ -648,63 +574,31 @@
         You can get the regular types after calling use_regtypes(True).
 
         """
-        if isinstance(newattnames, dict):
-            self._attnames = newattnames
-            return
-        elif newattnames:
-            raise _prg_error('If supplied, newattnames must be a dictionary')
-        cl = self._split_schema(cl)  # split into schema and class
-        qcl = _join_parts(cl)  # build fully qualified name
-        # May as well cache them:
-        if qcl in self._attnames:
-            return self._attnames[qcl]
-        if qcl not in self.get_relations('rv'):
-            raise _prg_error('Class %s does not exist' % qcl)
-
-        q = ("SELECT a.attname, t.typname%s"
-            " FROM pg_class r"
-            " JOIN pg_namespace s ON r.relnamespace = s.oid"
-            " JOIN pg_attribute a ON a.attrelid = r.oid"
-            " JOIN pg_type t ON t.oid = a.atttypid"
-            " WHERE s.nspname = $1 AND r.relname = $2"
-            " AND (a.attnum > 0 OR a.attname = 'oid')"
-            " AND NOT a.attisdropped") % (
-                '::regtype' if self._regtypes else '',)
-        q = self.db.query(q, cl).getresult()
-
-        if self._regtypes:
-            t = dict(q)
-        else:
-            t = {}
-            for att, typ in q:
-                if typ.startswith('bool'):
-                    typ = 'bool'
-                elif typ.startswith('abstime'):
-                    typ = 'date'
-                elif typ.startswith('date'):
-                    typ = 'date'
-                elif typ.startswith('interval'):
-                    typ = 'date'
-                elif typ.startswith('timestamp'):
-                    typ = 'date'
-                elif typ.startswith('oid'):
-                    typ = 'int'
-                elif typ.startswith('int'):
-                    typ = 'int'
-                elif typ.startswith('float'):
-                    typ = 'float'
-                elif typ.startswith('numeric'):
-                    typ = 'num'
-                elif typ.startswith('money'):
-                    typ = 'money'
-                elif typ.startswith('bytea'):
-                    typ = 'bytea'
-                else:
-                    typ = 'text'
-                t[att] = typ
-
-        self._attnames[qcl] = t  # cache it
-        return self._attnames[qcl]
+        attnames = self._attnames
+        if flush:
+            attnames.clear()
+            self._do_debug('pkey cache has been flushed')
+
+        try:  # cache lookup
+            names = attnames[cl]
+        except KeyError:  # cache miss, check the database
+            q = ("SELECT a.attname, t.typname%s"
+                " FROM pg_attribute a"
+                " JOIN pg_type t ON t.oid = a.atttypid"
+                " WHERE a.attrelid = %s::regclass"
+                " AND (a.attnum > 0 OR a.attname = 'oid')"
+                " AND NOT a.attisdropped") % (
+                    '::regtype' if self._regtypes else '',
+                    _quote_class_param(cl, 1))
+            names = self.db.query(q, (cl,)).getresult()
+            if not names:
+                raise KeyError('Class %s does not exist' % cl)
+            if self._regtypes:
+                names = dict(names)
+            else:
+                names = dict((name, _simpletype(typ)) for name, typ in names)
+            attnames[cl] = names  # cache it
+        return names
 
     def use_regtypes(self, regtypes=None):
         """Use regular type names instead of simplified type names."""
@@ -719,15 +613,15 @@
 
     def has_table_privilege(self, cl, privilege='select'):
         """Check whether current user has specified table privilege."""
-        qcl = self._add_schema(cl)
         privilege = privilege.lower()
-        try:
-            return self._privileges[(qcl, privilege)]
-        except KeyError:
-            q = "SELECT has_table_privilege($1, $2)"
-            q = self.db.query(q, (qcl, privilege))
+        try:  # ask cache
+            return self._privileges[(cl, privilege)]
+        except KeyError:  # cache miss, ask the database
+            q = "SELECT has_table_privilege(%s, $2)" % (
+                _quote_class_param(cl, 1),)
+            q = self.db.query(q, (cl, privilege))
             ret = q.getresult()[0][0] == self._make_bool(True)
-            self._privileges[(qcl, privilege)] = ret
+            self._privileges[(cl, privilege)] = ret  # cache it
             return ret
 
     def get(self, cl, arg, keyname=None):
@@ -741,23 +635,22 @@
         For a composite key, keyname can also be a sequence of key names.
         The OID is also put into the dictionary if the table has one, but
         in order to allow the caller to work with multiple tables, it is
-        munged as oid(schema.table).
+        munged as "oid(cl)".
 
         """
         if cl.endswith('*'):  # scan descendant tables?
             cl = cl[:-1].rstrip()  # need parent table name
         # build qualified class name
-        qcl = self._add_schema(cl)
         # To allow users to work with multiple tables,
-        # we munge the name of the "oid" the key
-        qoid = _oid_key(qcl)
+        # we munge the name of the "oid" key
+        qoid = _oid_key(cl)
         if not keyname:
             # use the primary key by default
             try:
-                keyname = self.pkey(qcl)
+                keyname = self.pkey(cl)
             except KeyError:
-                raise _prg_error('Class %s has no primary key' % qcl)
-        attnames = self.get_attnames(qcl)
+                raise _prg_error('Class %s has no primary key' % cl)
+        attnames = self.get_attnames(cl)
         # We want the oid for later updates if that isn't the key
         if keyname == 'oid':
             if isinstance(arg, dict):
@@ -777,11 +670,12 @@
             what = ', '.join(attnames)
             where = ' AND '.join(['%s = %s'
                 % (k, self._quote(arg[k], attnames[k])) for k in keyname])
-        q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (what, qcl, where)
+        q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (
+            what, _quote_class_name(cl), where)
         self._do_debug(q)
         res = self.db.query(q).dictresult()
         if not res:
-            raise _db_error('No such record in %s where %s' % (qcl, where))
+            raise _db_error('No such record in %s where %s' % (cl, where))
         for n, value in res[0].items():
             if n == 'oid':
                 n = qoid
@@ -807,24 +701,24 @@
         although PostgreSQL does.
 
         """
-        qcl = self._add_schema(cl)
-        qoid = _oid_key(qcl)
+        qoid = _oid_key(cl)
         if d is None:
             d = {}
         d.update(kw)
-        attnames = self.get_attnames(qcl)
+        attnames = self.get_attnames(cl)
         names, values = [], []
         for n in attnames:
             if n != 'oid' and n in d:
                 names.append('"%s"' % n)
                 values.append(self._quote(d[n], attnames[n]))
         names, values = ', '.join(names), ', '.join(values)
-        selectable = self.has_table_privilege(qcl)
+        selectable = self.has_table_privilege(cl)
         if selectable:
             ret = ' RETURNING %s*' % ('oid, ' if 'oid' in attnames else '')
         else:
             ret = ''
-        q = 'INSERT INTO %s (%s) VALUES (%s)%s' % (qcl, names, values, ret)
+        q = 'INSERT INTO %s (%s) VALUES (%s)%s' % (
+            _quote_class_name(cl), names, values, ret)
         self._do_debug(q)
         res = self.db.query(q)
         if ret:
@@ -838,13 +732,13 @@
         elif isinstance(res, int):
             d[qoid] = res
             if selectable:
-                self.get(qcl, d, 'oid')
+                self.get(cl, d, 'oid')
         elif selectable:
             if qoid in d:
-                self.get(qcl, d, 'oid')
+                self.get(cl, d, 'oid')
             else:
                 try:
-                    self.get(qcl, d)
+                    self.get(cl, d)
                 except ProgrammingError:
                     pass  # table has no primary key
         return d
@@ -862,23 +756,22 @@
         # Update always works on the oid which get returns if available,
         # otherwise use the primary key.  Fail if neither.
         # Note that we only accept oid key from named args for safety
-        qcl = self._add_schema(cl)
-        qoid = _oid_key(qcl)
+        qoid = _oid_key(cl)
         if 'oid' in kw:
             kw[qoid] = kw['oid']
             del kw['oid']
         if d is None:
             d = {}
         d.update(kw)
-        attnames = self.get_attnames(qcl)
+        attnames = self.get_attnames(cl)
         if qoid in d:
             where = 'oid = %s' % d[qoid]
             keyname = ()
         else:
             try:
-                keyname = self.pkey(qcl)
+                keyname = self.pkey(cl)
             except KeyError:
-                raise _prg_error('Class %s has no primary key' % qcl)
+                raise _prg_error('Class %s has no primary key' % cl)
             if isinstance(keyname, basestring):
                 keyname = (keyname,)
             try:
@@ -893,12 +786,13 @@
         if not values:
             return d
         values = ', '.join(values)
-        selectable = self.has_table_privilege(qcl)
+        selectable = self.has_table_privilege(cl)
         if selectable:
             ret = ' RETURNING %s*' % ('oid, ' if 'oid' in attnames else '')
         else:
             ret = ''
-        q = 'UPDATE %s SET %s WHERE %s%s' % (qcl, values, where, ret)
+        q = 'UPDATE %s SET %s WHERE %s%s' % (
+            _quote_class_name(cl), values, where, ret)
         self._do_debug(q)
         res = self.db.query(q)
         if ret:
@@ -912,9 +806,9 @@
         else:
             if selectable:
                 if qoid in d:
-                    self.get(qcl, d, 'oid')
+                    self.get(cl, d, 'oid')
                 else:
-                    self.get(qcl, d)
+                    self.get(cl, d)
         return d
 
     def clear(self, cl, a=None):
@@ -927,10 +821,9 @@
 
         """
         # At some point we will need a way to get defaults from a table.
-        qcl = self._add_schema(cl)
         if a is None:
             a = {}  # empty if argument is not present
-        attnames = self.get_attnames(qcl)
+        attnames = self.get_attnames(cl)
         for n, t in attnames.items():
             if n == 'oid':
                 continue
@@ -957,8 +850,7 @@
         # One day we will be testing that the record to be deleted
         # isn't referenced somewhere (or else PostgreSQL will).
         # Note that we only accept oid key from named args for safety
-        qcl = self._add_schema(cl)
-        qoid = _oid_key(qcl)
+        qoid = _oid_key(cl)
         if 'oid' in kw:
             kw[qoid] = kw['oid']
             del kw['oid']
@@ -969,18 +861,18 @@
             where = 'oid = %s' % d[qoid]
         else:
             try:
-                keyname = self.pkey(qcl)
+                keyname = self.pkey(cl)
             except KeyError:
-                raise _prg_error('Class %s has no primary key' % qcl)
+                raise _prg_error('Class %s has no primary key' % cl)
             if isinstance(keyname, basestring):
                 keyname = (keyname,)
-            attnames = self.get_attnames(qcl)
+            attnames = self.get_attnames(cl)
             try:
                 where = ' AND '.join(['%s = %s'
                     % (k, self._quote(d[k], attnames[k])) for k in keyname])
             except KeyError:
                 raise _prg_error('Delete needs primary key or oid.')
-        q = 'DELETE FROM %s WHERE %s' % (qcl, where)
+        q = 'DELETE FROM %s WHERE %s' % (_quote_class_name(cl), where)
         self._do_debug(q)
         return int(self.db.query(q))
 

Modified: trunk/tests/test_classic.py
==============================================================================
--- trunk/tests/test_classic.py Tue Jan 12 16:21:46 2016        (r728)
+++ trunk/tests/test_classic.py Tue Jan 12 16:29:07 2016        (r729)
@@ -75,7 +75,7 @@
             db.query("DELETE FROM _test_schema")
         try:
             db.query("CREATE VIEW _test_vschema AS "
-                "SELECT _test, 'abc'::text AS _test2  FROM _test_schema")
+                "SELECT _test, 'abc'::text AS _test2 FROM _test_schema")
         except Error:
             pass
 
@@ -110,12 +110,8 @@
         self.assertEqual(db.pkey('_test_schema'), '_test')
         self.assertEqual(db.pkey('public._test_schema'), '_test')
         self.assertEqual(db.pkey('_test1._test_schema'), '_test1')
-
-        self.assertEqual(db.pkey('_test_schema',
-                {'test1': 'a', 'test2.test3': 'b'}),
-                {'public.test1': 'a', 'test2.test3': 'b'})
-        self.assertEqual(db.pkey('test1'), 'a')
-        self.assertEqual(db.pkey('public.test1'), 'a')
+        self.assertEqual(db.pkey('_test2._test_schema'), '_test2')
+        self.assertRaises(KeyError, db.pkey, '_test_vschema')
 
     def test_get(self):
         db = opendb()

Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py       Tue Jan 12 16:21:46 2016        
(r728)
+++ trunk/tests/test_classic_dbwrapper.py       Tue Jan 12 16:29:07 2016        
(r729)
@@ -594,13 +594,15 @@
             self.assertIsInstance(r, frozenset)
             self.assertEqual(r, frozenset([
                 'a_very_long_column_name', 'with space', '42']))
-            self.assertEqual(pkey('%s0' % t, 'none'), 'none')
-            self.assertEqual(pkey('%s0' % t), 'none')
-            pkey(None, {'%s0' % t: 'a', 'public."%s1"' % t: 'b'})
+            # a newly added primary key will be detected
+            query('alter table "%s0" add primary key (a)' % t)
             self.assertEqual(pkey('%s0' % t), 'a')
+            # a changed primary key will not be detected,
+            # indicating that the internal cache is operating
+            query('alter table "%s1" rename column b to x' % t)
             self.assertEqual(pkey('%s1' % t), 'b')
-            pkey(None, {})
-            self.assertRaises(KeyError, pkey, '%s0' % t)
+            # we get the changed primary key when the cache is flushed
+            self.assertEqual(pkey('%s1' % t, flush=True), 'x')
             for n in range(7):
                 query('drop table "%s%d"' % (t, n))
 
@@ -716,10 +718,7 @@
                     % (table, n + 1, t))
             self.assertRaises(pg.ProgrammingError, get, table, 2)
             r = get(table, 2, 'n')
-            oid_table = table
-            if ' ' in table:
-                oid_table = '"%s"' % oid_table
-            oid_table = 'oid(public.%s)' % oid_table
+            oid_table = 'oid(%s)' % table
             self.assertIn(oid_table, r)
             oid = r[oid_table]
             self.assertIsInstance(oid, int)
@@ -797,10 +796,7 @@
                 " d numeric, f4 real, f8 double precision, m money,"
                 " v4 varchar(4), c4 char(4), t text,"
                 " b boolean, ts timestamp) with oids" % table)
-            oid_table = table
-            if ' ' in table:
-                oid_table = '"%s"' % oid_table
-            oid_table = 'oid(public.%s)' % oid_table
+            oid_table = 'oid(%s)' % table
             tests = [dict(i2=None, i4=None, i8=None),
                 (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
                 (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
@@ -1323,17 +1319,17 @@
         self.assertEqual(get("t", 1, 'n')['d'], 1)
         self.assertEqual(get("s4.t4", 1, 'n')['d'], 4)
 
-    def testMangling(self):
+    def testMunging(self):
         get = self.db.get
         query = self.db.query
         r = get("t", 1, 'n')
-        self.assertIn('oid(public.t)', r)
+        self.assertIn('oid(t)', r)
         query("set search_path to s2")
         r = get("t2", 1, 'n')
-        self.assertIn('oid(s2.t2)', r)
+        self.assertIn('oid(t2)', r)
         query("set search_path to s3")
         r = get("t", 1, 'n')
-        self.assertIn('oid(s3.t)', r)
+        self.assertIn('oid(t)', r)
 
 
 if __name__ == '__main__':

Modified: trunk/tests/test_classic_functions.py
==============================================================================
--- trunk/tests/test_classic_functions.py       Tue Jan 12 16:21:46 2016        
(r728)
+++ trunk/tests/test_classic_functions.py       Tue Jan 12 16:29:07 2016        
(r729)
@@ -32,148 +32,6 @@
     unicode = str
 
 
-class TestAuxiliaryFunctions(unittest.TestCase):
-    """Test the auxiliary functions external to the connection class."""
-
-    def testIsQuoted(self):
-        f = pg._is_quoted
-        self.assertTrue(f('A'))
-        self.assertTrue(f('0'))
-        self.assertTrue(f('#'))
-        self.assertTrue(f('*'))
-        self.assertTrue(f('.'))
-        self.assertTrue(f(' '))
-        self.assertTrue(f('a b'))
-        self.assertTrue(f('a+b'))
-        self.assertTrue(f('a*b'))
-        self.assertTrue(f('a.b'))
-        self.assertTrue(f('0ab'))
-        self.assertTrue(f('aBc'))
-        self.assertTrue(f('ABC'))
-        self.assertTrue(f('"a"'))
-        self.assertTrue(not f('a'))
-        self.assertTrue(not f('a0'))
-        self.assertTrue(not f('_'))
-        self.assertTrue(not f('_a'))
-        self.assertTrue(not f('_0'))
-        self.assertTrue(not f('_a_0_'))
-        self.assertTrue(not f('ab'))
-        self.assertTrue(not f('ab0'))
-        self.assertTrue(not f('abc'))
-        self.assertTrue(not f('abc'))
-        if 'ä'.isalpha():
-            self.assertTrue(not f('ä'))
-            self.assertTrue(f('Ä'))
-            self.assertTrue(not f('käse'))
-            self.assertTrue(f('Käse'))
-            self.assertTrue(not f('emmentaler_käse'))
-            self.assertTrue(f('emmentaler käse'))
-            self.assertTrue(f('EmmentalerKäse'))
-            self.assertTrue(f('Emmentaler Käse'))
-
-    def testIsUnquoted(self):
-        f = pg._is_unquoted
-        self.assertTrue(f('A'))
-        self.assertTrue(not f('0'))
-        self.assertTrue(not f('#'))
-        self.assertTrue(not f('*'))
-        self.assertTrue(not f('.'))
-        self.assertTrue(not f(' '))
-        self.assertTrue(not f('a b'))
-        self.assertTrue(not f('a+b'))
-        self.assertTrue(not f('a*b'))
-        self.assertTrue(not f('a.b'))
-        self.assertTrue(not f('0ab'))
-        self.assertTrue(f('aBc'))
-        self.assertTrue(f('ABC'))
-        self.assertTrue(not f('"a"'))
-        self.assertTrue(f('a0'))
-        self.assertTrue(f('_'))
-        self.assertTrue(f('_a'))
-        self.assertTrue(f('_0'))
-        self.assertTrue(f('_a_0_'))
-        self.assertTrue(f('ab'))
-        self.assertTrue(f('ab0'))
-        self.assertTrue(f('abc'))
-        if 'ä'.isalpha():
-            self.assertTrue(f('ä'))
-            self.assertTrue(f('Ä'))
-            self.assertTrue(f('käse'))
-            self.assertTrue(f('Käse'))
-            self.assertTrue(f('emmentaler_käse'))
-            self.assertTrue(not f('emmentaler käse'))
-            self.assertTrue(f('EmmentalerKäse'))
-            self.assertTrue(not f('Emmentaler Käse'))
-
-    def testSplitFirstPart(self):
-        f = pg._split_first_part
-        self.assertEqual(f('a.b'), ['a', 'b'])
-        self.assertEqual(f('a.b.c'), ['a', 'b.c'])
-        self.assertEqual(f('"a.b".c'), ['a.b', 'c'])
-        self.assertEqual(f('a."b.c"'), ['a', '"b.c"'])
-        self.assertEqual(f('A.b.c'), ['a', 'b.c'])
-        self.assertEqual(f('Ab.c'), ['ab', 'c'])
-        self.assertEqual(f('aB.c'), ['ab', 'c'])
-        self.assertEqual(f('AB.c'), ['ab', 'c'])
-        self.assertEqual(f('A b.c'), ['A b', 'c'])
-        self.assertEqual(f('a B.c'), ['a B', 'c'])
-        self.assertEqual(f('"A".b.c'), ['A', 'b.c'])
-        self.assertEqual(f('"A""B".c'), ['A"B', 'c'])
-        self.assertEqual(f('a.b.c.d.e.f.g'), ['a', 'b.c.d.e.f.g'])
-        self.assertEqual(f('"a.b.c.d.e.f".g'), ['a.b.c.d.e.f', 'g'])
-        self.assertEqual(f('a.B.c.D.e.F.g'), ['a', 'B.c.D.e.F.g'])
-        self.assertEqual(f('A.b.C.d.E.f.G'), ['a', 'b.C.d.E.f.G'])
-
-    def testSplitParts(self):
-        f = pg._split_parts
-        self.assertEqual(f('a.b'), ['a', 'b'])
-        self.assertEqual(f('a.b.c'), ['a', 'b', 'c'])
-        self.assertEqual(f('"a.b".c'), ['a.b', 'c'])
-        self.assertEqual(f('a."b.c"'), ['a', 'b.c'])
-        self.assertEqual(f('A.b.c'), ['a', 'b', 'c'])
-        self.assertEqual(f('Ab.c'), ['ab', 'c'])
-        self.assertEqual(f('aB.c'), ['ab', 'c'])
-        self.assertEqual(f('AB.c'), ['ab', 'c'])
-        self.assertEqual(f('A b.c'), ['A b', 'c'])
-        self.assertEqual(f('a B.c'), ['a B', 'c'])
-        self.assertEqual(f('"A".b.c'), ['A', 'b', 'c'])
-        self.assertEqual(f('"A""B".c'), ['A"B', 'c'])
-        self.assertEqual(f('a.b.c.d.e.f.g'),
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g'])
-        self.assertEqual(f('"a.b.c.d.e.f".g'),
-            ['a.b.c.d.e.f', 'g'])
-        self.assertEqual(f('a.B.c.D.e.F.g'),
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g'])
-        self.assertEqual(f('A.b.C.d.E.f.G'),
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g'])
-
-    def testJoinParts(self):
-        f = pg._join_parts
-        self.assertEqual(f(('a',)), 'a')
-        self.assertEqual(f(('a', 'b')), 'a.b')
-        self.assertEqual(f(('a', 'b', 'c')), 'a.b.c')
-        self.assertEqual(f(('a', 'b', 'c', 'd', 'e', 'f', 'g')),
-            'a.b.c.d.e.f.g')
-        self.assertEqual(f(('A', 'b')), '"A".b')
-        self.assertEqual(f(('a', 'B')), 'a."B"')
-        self.assertEqual(f(('a b', 'c')), '"a b".c')
-        self.assertEqual(f(('a', 'b c')), 'a."b c"')
-        self.assertEqual(f(('a_b', 'c')), 'a_b.c')
-        self.assertEqual(f(('a', 'b_c')), 'a.b_c')
-        self.assertEqual(f(('0', 'a')), '"0".a')
-        self.assertEqual(f(('0_', 'a')), '"0_".a')
-        self.assertEqual(f(('_0', 'a')), '_0.a')
-        self.assertEqual(f(('_a', 'b')), '_a.b')
-        self.assertEqual(f(('a', 'B', '0', 'c0', 'C0',
-            'd e', 'f_g', 'h.i', 'jklm', 'nopq')),
-            'a."B"."0".c0."C0"."d e".f_g."h.i".jklm.nopq')
-
-    def testOidKey(self):
-        f = pg._oid_key
-        self.assertEqual(f('a'), 'oid(a)')
-        self.assertEqual(f('a.b'), 'oid(a.b)')
-
-
 class TestHasConnect(unittest.TestCase):
     """Test existence of basic pg module functions."""
 
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to