Update of /usr/cvs/Public/pygresql/module
In directory druid.net:/tmp/cvs-serv6078
Modified Files:
pg.py test_pg.py
Log Message:
Code simplifications. Improved unittest for pkey().
To see the diffs for this commit:
http://www.druid.net/pygresql/viewcvs.cgi/cvs/pygresql/module/pg.py.diff?r1=1.70&r2=1.71
Index: pg.py
===================================================================
RCS file: /usr/cvs/Public/pygresql/module/pg.py,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -r1.70 -r1.71
--- pg.py 3 Dec 2008 00:17:15 -0000 1.70
+++ pg.py 4 Dec 2008 20:12:52 -0000 1.71
@@ -5,7 +5,7 @@
# Written by D'Arcy J.M. Cain
# Improved by Christoph Zwerschke
#
-# $Id: pg.py,v 1.70 2008/12/03 00:17:15 cito Exp $
+# $Id: pg.py,v 1.71 2008/12/04 20:12:52 cito Exp $
#
"""PyGreSQL classic interface.
@@ -304,21 +304,20 @@
If newpkey is set and is not a dictionary then set that
value as the primary key of the class. If it is a dictionary
- then replace the _pkeys dictionary with it.
+ then replace the _pkeys dictionary with a copy of it.
"""
# First see if the caller is supplying a dictionary
if isinstance(newpkey, dict):
- # make sure that we have a namespace
- self._pkeys = {}
- for x in newpkey.keys():
- if x.find('.') == -1:
- self._pkeys['public.' + x] = newpkey[x]
- else:
- self._pkeys[x] = newpkey[x]
+ # make sure that all classes have a namespace
+ self._pkeys = dict([
+ ('.' in cl and cl or 'public.' + cl, pkey)
+ for cl, pkey in newpkey.iteritems()])
return self._pkeys
- qcl = _join_parts(self._split_schema(cl)) # build qualified name
+ # Build qualified name for the given class
+ qcl = _join_parts(self._split_schema(cl))
+ # Check if the caller is supplying a new primary key for that class
if newpkey:
self._pkeys[qcl] = newpkey
return newpkey
@@ -339,6 +338,7 @@
" AND pg_index.indkey[0]=pg_attribute.attnum"
).getresult()])
self._do_debug(self._pkeys)
+
# will raise an exception if primary key doesn't exist
return self._pkeys[qcl]
@@ -461,14 +461,14 @@
(qcl, keyname, self._quote(k, fnames[keyname]))
else:
q = 'SELECT %s FROM %s WHERE %s=%s LIMIT 1' % \
- (','.join(fnames.keys()), qcl, \
+ (','.join(fnames), qcl, \
keyname, self._quote(k, fnames[keyname]))
self._do_debug(q)
res = self.db.query(q).dictresult()
if not res:
raise DatabaseError('No such record in %s where %s=%s'
% (qcl, keyname, self._quote(k, fnames[keyname])))
- for k, d in res[0].items():
+ for k, d in res[0].iteritems():
if k == 'oid':
k = foid
arg[k] = d
@@ -502,7 +502,7 @@
t = []
n = []
- for f in fnames.keys():
+ for f in fnames:
if f != 'oid' and f in a:
t.append(self._quote(a[f], fnames[f]))
n.append('"%s"' % f)
@@ -554,7 +554,7 @@
where = "%s='%s'" % (pk, a[pk])
v = []
fnames = self.get_attnames(qcl)
- for ff in fnames.keys():
+ for ff in fnames:
if ff != 'oid' and ff in a:
v.append('%s=%s' % (ff, self._quote(a[ff], fnames[ff])))
if v == []:
@@ -583,7 +583,7 @@
a = {} # empty if argument is not present
qcl = _join_parts(self._split_schema(cl)) # build qualified name
fnames = self.get_attnames(qcl)
- for k, t in fnames.items():
+ for k, t in fnames.iteritems():
if k == 'oid':
continue
if t in ('int', 'float', 'num', 'money'):
http://www.druid.net/pygresql/viewcvs.cgi/cvs/pygresql/module/test_pg.py.diff?r1=1.22&r2=1.23
Index: test_pg.py
===================================================================
RCS file: /usr/cvs/Public/pygresql/module/test_pg.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- test_pg.py 3 Dec 2008 00:17:15 -0000 1.22
+++ test_pg.py 4 Dec 2008 20:12:53 -0000 1.23
@@ -4,7 +4,7 @@
#
# Written by Christoph Zwerschke
#
-# $Id: test_pg.py,v 1.22 2008/12/03 00:17:15 cito Exp $
+# $Id: test_pg.py,v 1.23 2008/12/04 20:12:53 cito Exp $
#
"""Test the classic PyGreSQL interface in the pg module.
@@ -954,7 +954,7 @@
self.assertEqual(r, '5')
def testPkey(self):
- smart_ddl(self.db, 'drop table pkeytest0')
+ smart_ddl(self.db, "drop table pkeytest0")
smart_ddl(self.db, "create table pkeytest0 ("
"a smallint)")
smart_ddl(self.db, 'drop table pkeytest1')
@@ -963,15 +963,21 @@
smart_ddl(self.db, 'drop table pkeytest2')
smart_ddl(self.db, "create table pkeytest2 ("
"c smallint, d smallint primary key)")
- smart_ddl(self.db, 'drop table pkeytest3')
+ smart_ddl(self.db, "drop table pkeytest3")
smart_ddl(self.db, "create table pkeytest3 ("
"e smallint, f smallint, g smallint, "
"h smallint, i smallint, "
"primary key (f,h))")
- self.assertRaises(KeyError, self.db.pkey, "pkeytest0")
- self.assertEqual(self.db.pkey("pkeytest1"), "b")
- self.assertEqual(self.db.pkey("pkeytest2"), "d")
- self.assertEqual(self.db.pkey("pkeytest3"), "f")
+ self.assertRaises(KeyError, self.db.pkey, 'pkeytest0')
+ self.assertEqual(self.db.pkey('pkeytest1'), 'b')
+ self.assertEqual(self.db.pkey('pkeytest2'), 'd')
+ self.assertEqual(self.db.pkey('pkeytest3'), 'f')
+ self.assertEqual(self.db.pkey('pkeytest0', 'none'), 'none')
+ self.assertEqual(self.db.pkey('pkeytest0'), 'none')
+ self.db.pkey(None, {'t': 'a', 'n.t': 'b'})
+ self.assertEqual(self.db.pkey('t'), 'a')
+ self.assertEqual(self.db.pkey('n.t'), 'b')
+ self.assertRaises(KeyError, self.db.pkey, 'pkeytest0')
def testGetDatabases(self):
databases = self.db.get_databases()
_______________________________________________
PyGreSQL mailing list
[email protected]
http://mailman.vex.net/mailman/listinfo/pygresql