Author: cito
Date: Sat Jan 5 17:08:30 2013
New Revision: 493
Log:
Do not use positional parameters internally.
This restores backward compatibility with version 4.0.
Modified:
trunk/module/TEST_PyGreSQL_classic.py
trunk/module/pg.py
trunk/module/test_pg.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Sat Jan 5 15:25:40 2013
(r492)
+++ trunk/module/TEST_PyGreSQL_classic.py Sat Jan 5 17:08:30 2013
(r493)
@@ -199,13 +199,13 @@
self.assertEqual(q('1', 'text'), "'1'")
self.assertEqual(q('1', 'num'), "1")
self.assertEqual(q(None, 'int'), "NULL")
- self.assertEqual(q(1, 'money'), "'1.00'")
- self.assertEqual(q('1', 'money'), "'1.00'")
- self.assertEqual(q(1.234, 'money'), "'1.23'")
- self.assertEqual(q('1.234', 'money'), "'1.23'")
- self.assertEqual(q(0, 'money'), "'0.00'")
- self.assertEqual(q(0.00, 'money'), "'0.00'")
- self.assertEqual(q(Decimal('0.00'), 'money'), "'0.00'")
+ self.assertEqual(q(1, 'money'), "1")
+ self.assertEqual(q('1', 'money'), "1")
+ self.assertEqual(q(1.234, 'money'), "1.234")
+ self.assertEqual(q('1.234', 'money'), "1.234")
+ self.assertEqual(q(0, 'money'), "0")
+ self.assertEqual(q(0.00, 'money'), "0.0")
+ self.assertEqual(q(Decimal('0.00'), 'money'), "0.00")
self.assertEqual(q(None, 'money'), "NULL")
self.assertEqual(q('', 'money'), "NULL")
self.assertEqual(q(0, 'bool'), "'f'")
@@ -221,7 +221,8 @@
self.assertEqual(q('true', 'bool'), "'t'")
self.assertEqual(q('y', 'bool'), "'t'")
self.assertEqual(q('', 'date'), "NULL")
- self.assertEqual(q('date', 'date'), "'date'")
+ self.assertEqual(q('some_date', 'date'), "'some_date'")
+ self.assertEqual(q('current_timestamp', 'date'), "current_timestamp")
self.assertEqual(q('', 'text'), "''")
self.assertEqual(q("'", 'text'), "''''")
self.assertEqual(q("\\", 'text'), "'\\\\'")
@@ -231,7 +232,7 @@
def test_notify_DB(self):
global cb1_return
-
+
db = opendb()
db2 = opendb()
# Listen for 'event_1'
@@ -284,7 +285,7 @@
if __name__ == '__main__':
suite = unittest.TestSuite()
-
+
if len(sys.argv) > 1: test_list = sys.argv[1:]
else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
Modified: trunk/module/pg.py
==============================================================================
--- trunk/module/pg.py Sat Jan 5 15:25:40 2013 (r492)
+++ trunk/module/pg.py Sat Jan 5 17:08:30 2013 (r493)
@@ -29,7 +29,7 @@
from decimal import Decimal
set_decimal(Decimal)
except ImportError: # Python < 2.4
- pass
+ Decimal = float
try:
from collections import namedtuple
except ImportError: # Python < 2.6
@@ -128,6 +128,7 @@
"""Returns ProgrammingError."""
return _db_error(msg, ProgrammingError)
+
class pgnotify(object):
"""A PostgreSQL client-side asynchronous notification handler."""
@@ -318,7 +319,9 @@
"""Quote money value."""
if d is None or d == '':
return 'NULL'
- return "'%.2f'" % float(d)
+ if not isinstance(d, basestring):
+ d = str(d)
+ return d
_quote_funcs = dict( # quote methods for each type
text=_quote_text, bool=_quote_bool, date=_quote_date,
@@ -693,8 +696,7 @@
raise _db_error('%s not in arg' % qoid)
else:
arg = {qoid: arg}
- where = 'oid = $1'
- params = (arg[qoid],)
+ where = 'oid = %s' % arg[qoid]
attnames = '*'
else:
attnames = self.get_attnames(qcl)
@@ -704,16 +706,14 @@
if len(keyname) > 1:
raise _prg_error('Composite key needs dict as arg')
arg = dict([(k, arg) for k in keyname])
- where = ' AND '.join(['%s = $%d'
- % (k, i + 1) for i, k in enumerate(keyname)])
- params = tuple(arg[k] for k in keyname)
+ where = ' AND '.join(['%s = %s'
+ % (k, self._quote(arg[k], attnames[k])) for k in keyname])
attnames = ', '.join(attnames)
q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (attnames, qcl, where)
- self._do_debug(q + ' %% %r' % (params,))
- res = self.db.query(q, params).dictresult()
+ self._do_debug(q)
+ res = self.db.query(q).dictresult()
if not res:
- raise _db_error(
- 'No such record in %s where %s %% %r' % (qcl, where, params))
+ raise _db_error('No such record in %s where %s' % (qcl, where))
for att, value in res[0].iteritems():
arg[att == 'oid' and qoid or att] = value
return arg
@@ -738,14 +738,11 @@
d = {}
d.update(kw)
attnames = self.get_attnames(qcl)
- names, values, params = [], [], []
- i = 1
+ names, values = [], []
for n in attnames:
if n != 'oid' and n in d:
names.append('"%s"' % n)
- values.append('$%d' % (i,))
- params.append(d[n])
- i += 1
+ values.append(self._quote(d[n], attnames[n]))
names, values = ', '.join(names), ', '.join(values)
selectable = self.has_table_privilege(qcl)
if selectable and self.server_version >= 80200:
@@ -753,8 +750,8 @@
else:
ret = ''
q = 'INSERT INTO %s (%s) VALUES (%s)%s' % (qcl, names, values, ret)
- self._do_debug(q + " %% %r" % (params,))
- res = self.db.query(q, params)
+ self._do_debug(q)
+ res = self.db.query(q)
if ret:
res = res.dictresult()
for att, value in res[0].iteritems():
@@ -796,8 +793,7 @@
d.update(kw)
attnames = self.get_attnames(qcl)
if qoid in d:
- where = 'oid = $1'
- params = [d[qoid]]
+ where = 'oid = %s' % d[qoid]
keyname = ()
else:
try:
@@ -807,18 +803,14 @@
if isinstance(keyname, basestring):
keyname = (keyname,)
try:
- where = ' AND '.join(['%s = $%d'
- % (k, i + 1) for i, k in enumerate(keyname)])
- params = [d[k] for k in keyname]
+ where = ' AND '.join(['%s = %s'
+ % (k, self._quote(d[k], attnames[k])) for k in keyname])
except KeyError:
raise _prg_error('Update needs primary key or oid.')
values = []
- i = len(params)
for n in attnames:
if n in d and n not in keyname:
- i += 1
- values.append('%s = $%d' % (n, i))
- params.append(d[n])
+ values.append('%s = %s' % (n, self._quote(d[n], attnames[n])))
if not values:
return d
values = ', '.join(values)
@@ -829,7 +821,7 @@
ret = ''
q = 'UPDATE %s SET %s WHERE %s%s' % (qcl, values, where, ret)
self._do_debug(q)
- res = self.db.query(q, params)
+ res = self.db.query(q)
if ret:
res = res.dictresult()[0]
for att, value in res.iteritems():
@@ -891,8 +883,7 @@
d = {}
d.update(kw)
if qoid in d:
- where = 'oid = $1'
- params = (d[qoid],)
+ where = 'oid = %s' % d[qoid]
else:
try:
keyname = self.pkey(qcl)
@@ -900,15 +891,15 @@
raise _prg_error('Class %s has no primary key' % qcl)
if isinstance(keyname, basestring):
keyname = (keyname,)
+ attnames = self.get_attnames(qcl)
try:
- where = ' AND '.join(['%s = $%d'
- % (k, i + 1) for i, k in enumerate(keyname)])
- params = tuple(d[k] for k in keyname)
+ where = ' AND '.join(['%s = %s'
+ % (k, self._quote(d[k], attnames[k])) for k in keyname])
except KeyError:
raise _prg_error('Delete needs primary key or oid.')
q = 'DELETE FROM %s WHERE %s' % (qcl, where)
- self._do_debug(q + " %% %r" % (params,))
- return int(self.db.query(q, params))
+ self._do_debug(q)
+ return int(self.db.query(q))
def pgnotify(self, event, callback, arg_dict={}, timeout=None):
return pgnotify(self.db, event, callback, arg_dict, timeout)
Modified: trunk/module/test_pg.py
==============================================================================
--- trunk/module/test_pg.py Sat Jan 5 15:25:40 2013 (r492)
+++ trunk/module/test_pg.py Sat Jan 5 17:08:30 2013 (r493)
@@ -22,7 +22,7 @@
* Table privilege problems (e.g. insert but no select) are not tested.
* Status and error messages from the connection are not tested.
* It would be more reasonable to create a test for the underlying
- shared library functions in the _pg module and assume they are ok.vi
+ shared library functions in the _pg module and assume they are ok.
The pg and pgdb modules should be tested against _pg mock functions.
"""
@@ -1184,6 +1184,7 @@
dbname = DBTestSuite.dbname
self.dbname = dbname
self.db = pg.DB(dbname)
+ self.db.query("set lc_monetary='C'");
if debug:
self.db.debug = 'DEBUG: %s'
@@ -1276,14 +1277,14 @@
self.assertEqual(f('123456789', 'num'), '123456789')
self.assertEqual(f('1.23456789', 'num'), '1.23456789')
self.assertEqual(f('12345678.9', 'num'), '12345678.9')
- self.assertEqual(f(123, 'money'), "'123.00'")
- self.assertEqual(f('123', 'money'), "'123.00'")
- self.assertEqual(f(123.45, 'money'), "'123.45'")
- self.assertEqual(f('123.45', 'money'), "'123.45'")
- self.assertEqual(f(123.454, 'money'), "'123.45'")
- self.assertEqual(f('123.454', 'money'), "'123.45'")
- self.assertEqual(f(123.456, 'money'), "'123.46'")
- self.assertEqual(f('123.456', 'money'), "'123.46'")
+ self.assertEqual(f(123, 'money'), '123')
+ self.assertEqual(f('123', 'money'), '123')
+ self.assertEqual(f(123.45, 'money'), '123.45')
+ self.assertEqual(f('123.45', 'money'), '123.45')
+ self.assertEqual(f(123.454, 'money'), '123.454')
+ self.assertEqual(f('123.454', 'money'), '123.454')
+ self.assertEqual(f(123.456, 'money'), '123.456')
+ self.assertEqual(f('123.456', 'money'), '123.456')
self.assertEqual(f('f', 'bool'), "'f'")
self.assertEqual(f('F', 'bool'), "'f'")
self.assertEqual(f('false', 'bool'), "'f'")
@@ -1579,27 +1580,80 @@
smart_ddl(self.db, 'drop table "%s"' % table)
smart_ddl(self.db, 'create table "%s" ('
"i2 smallint, i4 integer, i8 bigint,"
- "d numeric, f4 real, f8 double precision, m money, "
- "v4 varchar(4), c4 char(4), t text,"
- "b boolean, ts timestamp)" % table)
- data = dict(
- i2=2 ** 15 - 1, i4=int(2 ** 31 - 1), i8=long(2 ** 31 - 1),
- d=Decimal('123456789.9876543212345678987654321'),
- f4=1.0 + 1.0 / 32, f8=1.0 + 1.0 / 32,
- m="1234.56", v4="1234", c4="1234", t="1234" * 10,
- b=1, ts='2012-12-21')
- r = self.db.insert(table, data)
- self.assertEqual(r, data)
+ " d numeric, f4 real, f8 double precision, m money,"
+ " v4 varchar(4), c4 char(4), t text,"
+ " b boolean, ts timestamp)" % table)
oid_table = table
if ' ' in table:
oid_table = '"%s"' % oid_table
oid_table = 'oid(public.%s)' % oid_table
- self.assertTrue(oid_table in r)
- self.assertTrue(isinstance(r[oid_table], int))
- s = self.db.query('select oid,* from "%s"' % table).dictresult()[0]
- s[oid_table] = s['oid']
- del s['oid']
- self.assertEqual(r, s)
+ tests = [dict(i2=None, i4=None, i8=None),
+ (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
+ (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
+ dict(i2=42, i4=123456, i8=9876543210),
+ dict(i2=2 ** 15 - 1,
+ i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
+ dict(d=None), (dict(d=''), dict(d=None)),
+ dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
+ dict(f4=None, f8=None), dict(f4=0, f8=0),
+ (dict(f4='', f8=''), dict(f4=None, f8=None)),
+ dict(d=1234.5, f4=1234.5, f8=1234.5),
+ dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
+ dict(d=Decimal('123456789.9876543212345678987654321')),
+ dict(m=None), (dict(m=''), dict(m=None)),
+ dict(m=Decimal('-1234.56')),
+ (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
+ dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
+ (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
+ (dict(m=1234.5), dict(m=Decimal('1234.5'))),
+ (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
+ (dict(m=123456), dict(m=Decimal('123456'))),
+ (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
+ dict(b=None), (dict(b=''), dict(b=None)),
+ dict(b='f'), dict(b='t'),
+ (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
+ (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
+ (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
+ (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
+ (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
+ (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
+ dict(v4=None, c4=None, t=None),
+ (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
+ dict(v4='1234', c4='1234', t='1234' * 10),
+ dict(v4='abcd', c4='abcd', t='abcdefg'),
+ (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
+ dict(ts=None), (dict(ts=''), dict(ts=None)),
+ (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
+ dict(ts='2012-12-21 00:00:00'),
+ (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
+ dict(ts='2012-12-21 12:21:12'),
+ dict(ts='2013-01-05 12:13:14'),
+ dict(ts='current_timestamp')]
+ for test in tests:
+ if isinstance(test, dict):
+ data = test
+ change = {}
+ else:
+ data, change = test
+ expect = data.copy()
+ expect.update(change)
+ self.assertEqual(self.db.insert(table, data), data)
+ self.assertTrue(oid_table in data)
+ oid = data[oid_table]
+ self.assertTrue(isinstance(oid, int))
+ data = dict(item for item in data.iteritems()
+ if item[0] in expect)
+ ts = expect.get('ts')
+ if ts == 'current_timestamp':
+ expect['ts'] = data['ts']
+ self.assertEqual(data, expect)
+ data = self.db.query(
+ 'select oid,* from "%s"' % table).dictresult()[0]
+ self.assertEqual(data['oid'], oid)
+ data = dict(item for item in data.iteritems()
+ if item[0] in expect)
+ self.assertEqual(data, expect)
+ self.db.query('delete from "%s"' % table)
def testUpdate(self):
for table in ('update_test_table', 'test table for update'):
@@ -1916,6 +1970,7 @@
for s in ('client_min_messages = warning',
'client_encoding = UTF8',
'date_style = ISO, MDY',
+ 'lc_monetary = C',
'default_with_oids = on',
'standard_conforming_strings = off',
'escape_string_warning = off'):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql