Update of /usr/cvs/Public/pygresql/module
In directory druid.net:/tmp/cvs-serv14172/module
Modified Files:
TEST_PyGreSQL_classic.py pg.py test_pg.py
Log Message:
Support for composite primary keys, code clean-up in get(), insert(), update(),
clear() and delete() methods.
To see the diffs for this commit:
http://www.druid.net/pygresql/viewcvs.cgi/cvs/pygresql/module/TEST_PyGreSQL_classic.py.diff?r1=1.14&r2=1.15
Index: TEST_PyGreSQL_classic.py
===================================================================
RCS file: /usr/cvs/Public/pygresql/module/TEST_PyGreSQL_classic.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- TEST_PyGreSQL_classic.py 21 Nov 2008 21:17:53 -0000 1.14
+++ TEST_PyGreSQL_classic.py 5 Dec 2008 02:05:28 -0000 1.15
@@ -92,7 +92,7 @@
db.get('_test_schema', 1234)
db.get('_test_schema', 1234, keyname = '_test')
- self.failUnlessRaises(KeyError, db.get, '_test_vschema', 1234)
+ self.failUnlessRaises(ProgrammingError, db.get, '_test_vschema', 1234)
db.get('_test_vschema', 1234, keyname = '_test')
def test_insert(self):
http://www.druid.net/pygresql/viewcvs.cgi/cvs/pygresql/module/pg.py.diff?r1=1.73&r2=1.74
Index: pg.py
===================================================================
RCS file: /usr/cvs/Public/pygresql/module/pg.py,v
retrieving revision 1.73
retrieving revision 1.74
diff -u -r1.73 -r1.74
--- pg.py 4 Dec 2008 21:11:54 -0000 1.73
+++ pg.py 5 Dec 2008 02:05:28 -0000 1.74
@@ -5,7 +5,7 @@
# Written by D'Arcy J.M. Cain
# Improved by Christoph Zwerschke
#
-# $Id: pg.py,v 1.73 2008/12/04 21:11:54 cito Exp $
+# $Id: pg.py,v 1.74 2008/12/05 02:05:28 cito Exp $
#
"""PyGreSQL classic interface.
@@ -86,6 +86,10 @@
"""Join all parts of a dot separated string."""
return '.'.join([_is_quoted(p) and '"%s"' % p or p for p in s])
+def _oid_key(qcl):
+ """Build oid key from qualified class name."""
+ return 'oid(%s)' % qcl
+
# The PostGreSQL database connection interface:
@@ -230,9 +234,9 @@
["SELECT %d::integer AS n, '%s'::name AS nspname"
% s for s in enumerate(schemas)])
query = ("SELECT nspname FROM pg_class"
- " JOIN pg_namespace ON
pg_class.relnamespace=pg_namespace.oid"
+ " JOIN pg_namespace ON pg_class.relnamespace =
pg_namespace.oid"
" JOIN (%s) AS p USING (nspname)"
- " WHERE pg_class.relname='%s'"
+ " WHERE pg_class.relname = '%s'"
" ORDER BY n LIMIT 1" % (query, cl))
schema = self.db.query(query).getresult()
if schema: # schema found
@@ -243,6 +247,10 @@
schema = 'public'
return schema, cl
+ def _add_schema(self, cl):
+ """Ensure that the class name is prefixed with a schema name."""
+ return _join_parts(self._split_schema(cl))
+
# Public methods
# escape_string and escape_bytea exist as methods,
@@ -302,6 +310,9 @@
def pkey(self, cl, newpkey=None):
"""This method gets or sets the primary key of a class.
+ A composite primary keys is represented as a frozenset. Note that
+ this raises an exception if the table does not have a primary key.
+
If newpkey is set and is not a dictionary then set that
value as the primary key of the class. If it is a dictionary
then replace the _pkeys dictionary with a copy of it.
@@ -315,9 +326,8 @@
for cl, pkey in newpkey.iteritems()])
return self._pkeys
- # Build qualified name for the given class
- qcl = _join_parts(self._split_schema(cl))
- # Check if the caller is supplying a new primary key for that class
+ qcl = self._add_schema(cl) # build fully qualified class name
+ # Check if the caller is supplying a new primary key for the class
if newpkey:
self._pkeys[qcl] = newpkey
return newpkey
@@ -325,20 +335,28 @@
# Get all the primary keys at once
if qcl not in self._pkeys:
# if not found, check again in case it was added after we started
- self._pkeys = dict([
- (_join_parts(r[:2]), r[2]) for r in self.db.query(
- "SELECT pg_namespace.nspname, pg_class.relname"
- ", pg_attribute.attname FROM pg_class"
- " JOIN pg_namespace ON pg_namespace.oid=pg_class.relnamespace"
+ self._pkeys = {}
+ if self.server_version >= 80200:
+ # the ANY syntax works correctly only with PostgreSQL >= 8.2
+ any_indkey = "= ANY (pg_index.indkey)"
+ else:
+ any_indkey = "IN (%s)" % ', '.join(
+ ['pg_index.indkey[%d]' % i for i in range(16)])
+ for r in self.db.query(
+ "SELECT pg_namespace.nspname, pg_class.relname,"
+ " pg_attribute.attname FROM pg_class"
+ " JOIN pg_namespace ON pg_namespace.oid =
pg_class.relnamespace"
" AND pg_namespace.nspname NOT LIKE 'pg_%'"
- " JOIN pg_attribute ON pg_attribute.attrelid=pg_class.oid"
- " AND pg_attribute.attisdropped='f'"
- " JOIN pg_index ON pg_index.indrelid=pg_class.oid"
- " AND pg_index.indisprimary='t'"
- # note that this gets only the first attribute
- # of composite primary keys
- " AND pg_index.indkey[0]=pg_attribute.attnum"
- ).getresult()])
+ " JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid"
+ " AND pg_attribute.attisdropped = 'f'"
+ " JOIN pg_index ON pg_index.indrelid = pg_class.oid"
+ " AND pg_index.indisprimary = 't'"
+ " AND pg_attribute.attnum " + any_indkey).getresult():
+ cl, pkey = _join_parts(r[:2]), r[2]
+ self._pkeys.setdefault(cl, []).append(pkey)
+ # (only) for composite primary keys, the values will be frozensets
+ for cl, pkey in self._pkeys.iteritems():
+ self._pkeys[cl] = len(pkey) > 1 and frozenset(pkey) or pkey[0]
self._do_debug(self._pkeys)
# will raise an exception if primary key doesn't exist
@@ -362,7 +380,7 @@
return map(_join_parts, self.db.query(
"SELECT pg_namespace.nspname, pg_class.relname "
"FROM pg_class "
- "JOIN pg_namespace ON pg_namespace.oid=pg_class.relnamespace "
+ "JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace "
"WHERE %s pg_class.relname !~ '^Inv' AND "
"pg_class.relname !~ '^pg_' "
"ORDER BY 1, 2" % where).getresult())
@@ -386,22 +404,22 @@
elif newattnames:
raise ProgrammingError(
'If supplied, newattnames must be a dictionary')
- cl = self._split_schema(cl) # split into schema and cl
- qcl = _join_parts(cl) # build qualified name
+ cl = self._split_schema(cl) # split into schema and class
+ qcl = _join_parts(cl) # build fully qualified name
# May as well cache them:
if qcl in self._attnames:
return self._attnames[qcl]
if qcl not in self.get_relations('rv'):
raise ProgrammingError('Class %s does not exist' % qcl)
t = {}
- for att, typ in self.db.query("SELECT pg_attribute.attname"
- ", pg_type.typname FROM pg_class"
- " JOIN pg_namespace ON pg_class.relnamespace=pg_namespace.oid"
- " JOIN pg_attribute ON pg_attribute.attrelid=pg_class.oid"
- " JOIN pg_type ON pg_type.oid=pg_attribute.atttypid"
- " WHERE pg_namespace.nspname='%s' AND pg_class.relname='%s'"
- " AND (pg_attribute.attnum>0 or pg_attribute.attname='oid')"
- " AND pg_attribute.attisdropped='f'"
+ for att, typ in self.db.query("SELECT pg_attribute.attname,"
+ " pg_type.typname FROM pg_class"
+ " JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid"
+ " JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid"
+ " JOIN pg_type ON pg_type.oid = pg_attribute.atttypid"
+ " WHERE pg_namespace.nspname = '%s' AND pg_class.relname = '%s'"
+ " AND (pg_attribute.attnum > 0 or pg_attribute.attname = 'oid')"
+ " AND pg_attribute.attisdropped = 'f'"
% cl).getresult():
if typ.startswith('bool'):
t[att] = 'bool'
@@ -431,11 +449,12 @@
def get(self, cl, arg, keyname=None):
"""Get a tuple from a database table or view.
- This method is the basic mechanism to get a single row. It assumes
+ This method is the basic mechanism to get a single row. The keyname
that the key specifies a unique row. If keyname is not specified
then the primary key for the table is used. If arg is a dictionary
then the value for the key is taken from it and it is modified to
include the new values, replacing existing values where necessary.
+ For a composite key, keyname can also be a sequence of key names.
The OID is also put into the dictionary if the table has one, but
in order to allow the caller to work with multiple tables, it is
munged as oid(schema.table).
@@ -443,34 +462,44 @@
"""
if cl.endswith('*'): # scan descendant tables?
cl = cl[:-1].rstrip() # need parent table name
- qcl = _join_parts(self._split_schema(cl)) # build qualified name
+ # build qualified class name
+ qcl = self._add_schema(cl)
# To allow users to work with multiple tables,
- # we munge the name when the key is "oid"
- qoid = 'oid(%s)' % qcl # build mangled name
- if keyname is None: # use the primary key by default
- keyname = self.pkey(qcl)
- if isinstance(arg, dict):
- keyvalue = arg[keyname == 'oid' and qoid or keyname]
- else:
- keyvalue, arg = arg, {}
+ # we munge the name of the "oid" the key
+ qoid = _oid_key(qcl)
+ if not keyname:
+ # use the primary key by default
+ try:
+ keyname = self.pkey(qcl)
+ except KeyError:
+ raise ProgrammingError('Class %s has no primary key' % qcl)
# We want the oid for later updates if that isn't the key
if keyname == 'oid':
+ if isinstance(arg, dict):
+ if qoid not in arg:
+ raise ProgrammingError('%s not in arg' % qoid)
+ else:
+ arg = {qoid: arg}
+ where = 'oid = %s' % arg[qoid]
attnames = '*'
else:
attnames = self.get_attnames(qcl)
- keyvalue = self._quote(keyvalue, attnames[keyname])
- attnames = ','.join(attnames)
- q = 'SELECT %s FROM %s WHERE %s=%s LIMIT 1' % (
- attnames, qcl, keyname, keyvalue)
+ if isinstance(keyname, basestring):
+ keyname = (keyname,)
+ if not isinstance(arg, dict):
+ if len(keyname) > 1:
+ raise ProgrammingError('Composite key needs dict as arg')
+ arg = dict([(k, arg) for k in keyname])
+ where = ' AND '.join(['%s = %s'
+ % (k, self._quote(arg[k], attnames[k])) for k in keyname])
+ attnames = ', '.join(attnames)
+ q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (attnames, qcl, where)
self._do_debug(q)
res = self.db.query(q).dictresult()
if not res:
- raise DatabaseError('No such record in %s where %s=%s'
- % (qcl, keyname, keyvalue))
+ raise DatabaseError('No such record in %s where %s' % (qcl, where))
for att, value in res[0].iteritems():
- if att == 'oid':
- att = qoid
- arg[att] = value
+ arg[att == 'oid' and qoid or att] = value
return arg
def insert(self, cl, d=None, return_changes=True, **kw):
@@ -490,32 +519,25 @@
"""
if d is None:
- a = {}
- else:
- a = d
- a.update(kw)
-
- qcl = _join_parts(self._split_schema(cl)) # build qualified name
- qoid = 'oid(%s)' % qcl # build mangled name
- fnames = self.get_attnames(qcl)
- t = []
- n = []
-
- for f in fnames:
- if f != 'oid' and f in a:
- t.append(self._quote(a[f], fnames[f]))
- n.append('"%s"' % f)
-
- q = 'INSERT INTO %s (%s) VALUES (%s)' % \
- (qcl, ','.join(n), ','.join(t))
+ d = {}
+ d.update(kw)
+ qcl = self._add_schema(cl)
+ qoid = _oid_key(qcl)
+ attnames = self.get_attnames(qcl)
+ names, values = [], []
+ for n in attnames:
+ if n != 'oid' and n in d:
+ names.append('"%s"' % n)
+ values.append(self._quote(d[n], attnames[n]))
+ names, values = ', '.join(names), ', '.join(values)
+ q = 'INSERT INTO %s (%s) VALUES (%s)' % (qcl, names, values)
self._do_debug(q)
- a[qoid] = self.db.query(q)
-
+ d[qoid] = self.db.query(q)
# Reload the dictionary to catch things modified by engine.
# Note that get() changes 'oid' below to oid(schema.table).
- if return_changes: self.get(qcl, a, 'oid')
-
- return a
+ if return_changes:
+ self.get(qcl, d, 'oid')
+ return d
def update(self, cl, d=None, **kw):
"""Update an existing row in a database table.
@@ -528,44 +550,46 @@
"""
# Update always works on the oid which get returns if available,
# otherwise use the primary key. Fail if neither.
- qcl = _join_parts(self._split_schema(cl)) # build qualified name
- qoid = 'oid(%s)' % qcl # build mangled oid
-
# Note that we only accept oid key from named args for safety
if 'oid' in kw:
kw[qoid] = kw['oid']
del kw['oid']
-
if d is None:
- a = {}
- else:
- a = d
- a.update(kw)
-
- if qoid in a:
- where = "oid=%s" % a[qoid]
+ d = {}
+ d.update(kw)
+ qcl = self._add_schema(cl)
+ qoid = _oid_key(qcl)
+ attnames = self.get_attnames(qcl)
+ if qoid in d:
+ where = 'oid = %s' % d[qoid]
+ keyname = ()
else:
try:
- pk = self.pkey(qcl)
- except Exception:
- raise ProgrammingError(
- 'Update needs primary key or oid as %s' % qoid)
- where = "%s='%s'" % (pk, a[pk])
- v = []
- fnames = self.get_attnames(qcl)
- for ff in fnames:
- if ff != 'oid' and ff in a:
- v.append('%s=%s' % (ff, self._quote(a[ff], fnames[ff])))
- if v == []:
- return None
- q = 'UPDATE %s SET %s WHERE %s' % (qcl, ','.join(v), where)
+ keyname = self.pkey(qcl)
+ except KeyError:
+ raise ProgrammingError('Class %s has no primary key' % qcl)
+ if isinstance(keyname, basestring):
+ keyname = (keyname,)
+ try:
+ where = ' AND '.join(['%s = %s'
+ % (k, self._quote(d[k], attnames[k])) for k in keyname])
+ except KeyError:
+ raise ProgrammingError('Update needs primary key or oid.')
+ values = []
+ for n in attnames:
+ if n in d and n not in keyname:
+ values.append('%s = %s' % (n, self._quote(d[n], attnames[n])))
+ if not values:
+ return d
+ values = ', '.join(values)
+ q = 'UPDATE %s SET %s WHERE %s' % (qcl, values, where)
self._do_debug(q)
self.db.query(q)
# Reload the dictionary to catch things modified by engine:
- if qoid in a:
- return self.get(qcl, a, 'oid')
+ if qoid in d:
+ return self.get(qcl, d, 'oid')
else:
- return self.get(qcl, a)
+ return self.get(qcl, d)
def clear(self, cl, a=None):
"""
@@ -580,17 +604,17 @@
# At some point we will need a way to get defaults from a table.
if a is None:
a = {} # empty if argument is not present
- qcl = _join_parts(self._split_schema(cl)) # build qualified name
- fnames = self.get_attnames(qcl)
- for k, t in fnames.iteritems():
- if k == 'oid':
+ qcl = self._add_schema(cl)
+ attnames = self.get_attnames(qcl)
+ for n, t in attnames.iteritems():
+ if n == 'oid':
continue
if t in ('int', 'float', 'num', 'money'):
- a[k] = 0
+ a[n] = 0
elif t == 'bool':
- a[k] = 'f'
+ a[n] = 'f'
else:
- a[k] = ''
+ a[n] = ''
return a
def delete(self, cl, d=None, **kw):
@@ -603,21 +627,16 @@
# Like update, delete works on the oid.
# One day we will be testing that the record to be deleted
# isn't referenced somewhere (or else PostgreSQL will).
- qcl = _join_parts(self._split_schema(cl)) # build qualified name
- qoid = 'oid(%s)' % qcl # build mangled oid
-
# Note that we only accept oid key from named args for safety
if 'oid' in kw:
kw[qoid] = kw['oid']
del kw['oid']
-
if d is None:
- a = {}
- else:
- a = d
- a.update(kw)
-
- q = 'DELETE FROM %s WHERE oid=%s' % (qcl, a[qoid])
+ d = {}
+ d.update(kw)
+ qcl = self._add_schema(cl)
+ qoid = _oid_key(qcl)
+ q = 'DELETE FROM %s WHERE oid=%s' % (qcl, d[qoid])
self._do_debug(q)
self.db.query(q)
http://www.druid.net/pygresql/viewcvs.cgi/cvs/pygresql/module/test_pg.py.diff?r1=1.25&r2=1.26
Index: test_pg.py
===================================================================
RCS file: /usr/cvs/Public/pygresql/module/test_pg.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -r1.25 -r1.26
--- test_pg.py 4 Dec 2008 21:22:18 -0000 1.25
+++ test_pg.py 5 Dec 2008 02:05:28 -0000 1.26
@@ -4,7 +4,7 @@
#
# Written by Christoph Zwerschke
#
-# $Id: test_pg.py,v 1.25 2008/12/04 21:22:18 cito Exp $
+# $Id: test_pg.py,v 1.26 2008/12/05 02:05:28 cito Exp $
#
"""Test the classic PyGreSQL interface in the pg module.
@@ -41,8 +41,12 @@
german = 0
try:
+ frozenset
+except NameError: # Python < 2.4
+ from sets import ImmutableSet as frozenset
+try:
from decimal import Decimal
-except ImportError:
+except ImportError: # Python < 2.4
Decimal = float
@@ -202,6 +206,11 @@
'd e', 'f_g', 'h.i', 'jklm', 'nopq')),
'a."B"."0".c0."C0"."d e".f_g."h.i".jklm.nopq')
+ def testOidKey(self):
+ f = pg._oid_key
+ self.assertEqual(f('a'), 'oid(a)')
+ self.assertEqual(f('a.b'), 'oid(a.b)')
+
class TestHasConnect(unittest.TestCase):
"""Test existence of basic pg module functions."""
@@ -971,7 +980,7 @@
self.assertRaises(KeyError, self.db.pkey, 'pkeytest0')
self.assertEqual(self.db.pkey('pkeytest1'), 'b')
self.assertEqual(self.db.pkey('pkeytest2'), 'd')
- self.assertEqual(self.db.pkey('pkeytest3'), 'f')
+ self.assertEqual(self.db.pkey('pkeytest3'), frozenset('fh'))
self.assertEqual(self.db.pkey('pkeytest0', 'none'), 'none')
self.assertEqual(self.db.pkey('pkeytest0'), 'none')
self.db.pkey(None, {'t': 'a', 'n.t': 'b'})
@@ -1059,18 +1068,19 @@
for n, t in enumerate('xyz'):
self.db.query('insert into "%s" values('
"%d, '%s')" % (table, n+1, t))
- self.assertRaises(KeyError, self.db.get, table, 2)
+ self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
r = self.db.get(table, 2, 'n')
oid_table = table
if ' ' in table:
oid_table = '"%s"' % oid_table
oid_table = 'oid(public.%s)' % oid_table
self.assert_(oid_table in r)
- self.assert_(isinstance(r[oid_table], int))
- self.assertEqual(self.db.get(table + ' *', 2, 'n'), r)
- result = {'t': 'y', 'n': 2, oid_table: r[oid_table]}
+ oid = r[oid_table]
+ self.assert_(isinstance(oid, int))
+ result = {'t': 'y', 'n': 2, oid_table: oid}
self.assertEqual(r, result)
- self.assertEqual(self.db.get(table, r[oid_table], 'oid')['t'], 'y')
+ self.assertEqual(self.db.get(table + ' *', 2, 'n'), r)
+ self.assertEqual(self.db.get(table, oid, 'oid')['t'], 'y')
self.assertEqual(self.db.get(table, 1, 'n')['t'], 'x')
self.assertEqual(self.db.get(table, 3, 'n')['t'], 'z')
self.assertEqual(self.db.get(table, 2, 'n')['t'], 'y')
@@ -1090,6 +1100,31 @@
r['n'] = 2
self.assertEqual(self.db.get(table, r)['t'], 'y')
+ def testGetWithCompositeKey(self):
+ table = 'get_test_table_1'
+ smart_ddl(self.db, "drop table %s" % table)
+ smart_ddl(self.db, "create table %s ("
+ "n integer, t text, primary key (n))" % table)
+ for n, t in enumerate('abc'):
+ self.db.query("insert into %s values("
+ "%d, '%s')" % (table, n+1, t))
+ self.assertEqual(self.db.get(table, 2)['t'], 'b')
+ table = 'get_test_table_2'
+ smart_ddl(self.db, "drop table %s" % table)
+ smart_ddl(self.db, "create table %s ("
+ "n integer, m integer, t text, primary key (n, m))" % table)
+ for n in range(3):
+ for m in range(2):
+ t = chr(ord('a') + 2*n +m)
+ self.db.query("insert into %s values("
+ "%d, %d, '%s')" % (table, n+1, m+1, t))
+ self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
+ self.assertEqual(self.db.get(table, dict(n=2, m=2))['t'], 'd')
+ self.assertEqual(self.db.get(table, dict(n=1, m=2),
+ ('n', 'm'))['t'], 'b')
+ self.assertEqual(self.db.get(table, dict(n=3, m=2),
+ frozenset(['n', 'm']))['t'], 'f')
+
def testGetFromView(self):
self.db.query('delete from test where i4=14')
self.db.query('insert into test (i4, v4) values('
@@ -1133,7 +1168,7 @@
for n, t in enumerate('xyz'):
self.db.query('insert into "%s" values('
"%d, '%s')" % (table, n+1, t))
- self.assertRaises(KeyError, self.db.get, table, 2)
+ self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
r = self.db.get(table, 2, 'n')
r['t'] = 'u'
s = self.db.update(table, r)
@@ -1142,6 +1177,37 @@
).getresult()[0][0]
self.assertEqual(r, 'u')
+ def testUpdateWithCompositeKey(self):
+ table = 'get_update_table_1'
+ smart_ddl(self.db, "drop table %s" % table)
+ smart_ddl(self.db, "create table %s ("
+ "n integer, t text, primary key (n))" % table)
+ for n, t in enumerate('abc'):
+ self.db.query("insert into %s values("
+ "%d, '%s')" % (table, n+1, t))
+ self.assertRaises(pg.ProgrammingError, self.db.update,
+ table, dict(t='b'))
+ self.assertEqual(self.db.update(table, dict(n=2, t='d'))['t'], 'd')
+ r = self.db.query('select t from "%s" where n=2' % table
+ ).getresult()[0][0]
+ self.assertEqual(r, 'd')
+ table = 'get_test_update_2'
+ smart_ddl(self.db, "drop table %s" % table)
+ smart_ddl(self.db, "create table %s ("
+ "n integer, m integer, t text, primary key (n, m))" % table)
+ for n in range(3):
+ for m in range(2):
+ t = chr(ord('a') + 2*n +m)
+ self.db.query("insert into %s values("
+ "%d, %d, '%s')" % (table, n+1, m+1, t))
+ self.assertRaises(pg.ProgrammingError, self.db.update,
+ table, dict(n=2, t='b'))
+ self.assertEqual(self.db.update(table,
+ dict(n=2, m=2, t='x'))['t'], 'x')
+ r = [r[0] for r in self.db.query('select t from "%s" where n=2'
+ ' order by m' % table).getresult()]
+ self.assertEqual(r, ['c', 'x'])
+
def testClear(self):
for table in ('clear_test_table', 'test table for clear'):
smart_ddl(self.db, 'drop table "%s"' % table)
@@ -1166,7 +1232,7 @@
for n, t in enumerate('xyz'):
self.db.query('insert into "%s" values('
"%d, '%s')" % (table, n+1, t))
- self.assertRaises(KeyError, self.db.get, table, 2)
+ self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
r = self.db.get(table, 1, 'n')
s = self.db.delete(table, r)
r = self.db.get(table, 3, 'n')
_______________________________________________
PyGreSQL mailing list
[email protected]
http://mailman.vex.net/mailman/listinfo/pygresql