Author: cito
Date: Sat Jan  5 17:08:30 2013
New Revision: 493

Log:
Do not use positional parameters internally.
This restores backward compatibility with version 4.0.

Modified:
   trunk/module/TEST_PyGreSQL_classic.py
   trunk/module/pg.py
   trunk/module/test_pg.py

Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py       Sat Jan  5 15:25:40 2013        
(r492)
+++ trunk/module/TEST_PyGreSQL_classic.py       Sat Jan  5 17:08:30 2013        
(r493)
@@ -199,13 +199,13 @@
         self.assertEqual(q('1', 'text'), "'1'")
         self.assertEqual(q('1', 'num'), "1")
         self.assertEqual(q(None, 'int'), "NULL")
-        self.assertEqual(q(1, 'money'), "'1.00'")
-        self.assertEqual(q('1', 'money'), "'1.00'")
-        self.assertEqual(q(1.234, 'money'), "'1.23'")
-        self.assertEqual(q('1.234', 'money'), "'1.23'")
-        self.assertEqual(q(0, 'money'), "'0.00'")
-        self.assertEqual(q(0.00, 'money'), "'0.00'")
-        self.assertEqual(q(Decimal('0.00'), 'money'), "'0.00'")
+        self.assertEqual(q(1, 'money'), "1")
+        self.assertEqual(q('1', 'money'), "1")
+        self.assertEqual(q(1.234, 'money'), "1.234")
+        self.assertEqual(q('1.234', 'money'), "1.234")
+        self.assertEqual(q(0, 'money'), "0")
+        self.assertEqual(q(0.00, 'money'), "0.0")
+        self.assertEqual(q(Decimal('0.00'), 'money'), "0.00")
         self.assertEqual(q(None, 'money'), "NULL")
         self.assertEqual(q('', 'money'), "NULL")
         self.assertEqual(q(0, 'bool'), "'f'")
@@ -221,7 +221,8 @@
         self.assertEqual(q('true', 'bool'), "'t'")
         self.assertEqual(q('y', 'bool'), "'t'")
         self.assertEqual(q('', 'date'), "NULL")
-        self.assertEqual(q('date', 'date'), "'date'")
+        self.assertEqual(q('some_date', 'date'), "'some_date'")
+        self.assertEqual(q('current_timestamp', 'date'), "current_timestamp")
         self.assertEqual(q('', 'text'), "''")
         self.assertEqual(q("'", 'text'), "''''")
         self.assertEqual(q("\\", 'text'), "'\\\\'")
@@ -231,7 +232,7 @@
 
     def test_notify_DB(self):
         global cb1_return
-        
+
         db = opendb()
         db2 = opendb()
         # Listen for 'event_1'
@@ -284,7 +285,7 @@
 
 if __name__ == '__main__':
     suite = unittest.TestSuite()
-    
+
     if len(sys.argv) > 1: test_list = sys.argv[1:]
     else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
 

Modified: trunk/module/pg.py
==============================================================================
--- trunk/module/pg.py  Sat Jan  5 15:25:40 2013        (r492)
+++ trunk/module/pg.py  Sat Jan  5 17:08:30 2013        (r493)
@@ -29,7 +29,7 @@
     from decimal import Decimal
     set_decimal(Decimal)
 except ImportError:  # Python < 2.4
-    pass
+    Decimal = float
 try:
     from collections import namedtuple
 except ImportError:  # Python < 2.6
@@ -128,6 +128,7 @@
     """Returns ProgrammingError."""
     return _db_error(msg, ProgrammingError)
 
+
 class pgnotify(object):
     """A PostgreSQL client-side asynchronous notification handler."""
 
@@ -318,7 +319,9 @@
         """Quote money value."""
         if d is None or d == '':
             return 'NULL'
-        return "'%.2f'" % float(d)
+        if not isinstance(d, basestring):
+            d = str(d)
+        return d
 
     _quote_funcs = dict(  # quote methods for each type
         text=_quote_text, bool=_quote_bool, date=_quote_date,
@@ -693,8 +696,7 @@
                     raise _db_error('%s not in arg' % qoid)
             else:
                 arg = {qoid: arg}
-            where = 'oid = $1'
-            params = (arg[qoid],)
+            where = 'oid = %s' % arg[qoid]
             attnames = '*'
         else:
             attnames = self.get_attnames(qcl)
@@ -704,16 +706,14 @@
                 if len(keyname) > 1:
                     raise _prg_error('Composite key needs dict as arg')
                 arg = dict([(k, arg) for k in keyname])
-            where = ' AND '.join(['%s = $%d'
-                % (k, i + 1) for i, k in enumerate(keyname)])
-            params = tuple(arg[k] for k in keyname)
+            where = ' AND '.join(['%s = %s'
+                % (k, self._quote(arg[k], attnames[k])) for k in keyname])
             attnames = ', '.join(attnames)
         q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (attnames, qcl, where)
-        self._do_debug(q + ' %% %r' % (params,))
-        res = self.db.query(q, params).dictresult()
+        self._do_debug(q)
+        res = self.db.query(q).dictresult()
         if not res:
-            raise _db_error(
-                'No such record in %s where %s %% %r' % (qcl, where, params))
+            raise _db_error('No such record in %s where %s' % (qcl, where))
         for att, value in res[0].iteritems():
             arg[att == 'oid' and qoid or att] = value
         return arg
@@ -738,14 +738,11 @@
             d = {}
         d.update(kw)
         attnames = self.get_attnames(qcl)
-        names, values, params = [], [], []
-        i = 1
+        names, values = [], []
         for n in attnames:
             if n != 'oid' and n in d:
                 names.append('"%s"' % n)
-                values.append('$%d' % (i,))
-                params.append(d[n])
-                i += 1
+                values.append(self._quote(d[n], attnames[n]))
         names, values = ', '.join(names), ', '.join(values)
         selectable = self.has_table_privilege(qcl)
         if selectable and self.server_version >= 80200:
@@ -753,8 +750,8 @@
         else:
             ret = ''
         q = 'INSERT INTO %s (%s) VALUES (%s)%s' % (qcl, names, values, ret)
-        self._do_debug(q + " %% %r" % (params,))
-        res = self.db.query(q, params)
+        self._do_debug(q)
+        res = self.db.query(q)
         if ret:
             res = res.dictresult()
             for att, value in res[0].iteritems():
@@ -796,8 +793,7 @@
         d.update(kw)
         attnames = self.get_attnames(qcl)
         if qoid in d:
-            where = 'oid = $1'
-            params = [d[qoid]]
+            where = 'oid = %s' % d[qoid]
             keyname = ()
         else:
             try:
@@ -807,18 +803,14 @@
             if isinstance(keyname, basestring):
                 keyname = (keyname,)
             try:
-                where = ' AND '.join(['%s = $%d'
-                    % (k, i + 1) for i, k in enumerate(keyname)])
-                params = [d[k] for k in keyname]
+                where = ' AND '.join(['%s = %s'
+                    % (k, self._quote(d[k], attnames[k])) for k in keyname])
             except KeyError:
                 raise _prg_error('Update needs primary key or oid.')
         values = []
-        i = len(params)
         for n in attnames:
             if n in d and n not in keyname:
-                i += 1
-                values.append('%s = $%d' % (n, i))
-                params.append(d[n])
+                values.append('%s = %s' % (n, self._quote(d[n], attnames[n])))
         if not values:
             return d
         values = ', '.join(values)
@@ -829,7 +821,7 @@
             ret = ''
         q = 'UPDATE %s SET %s WHERE %s%s' % (qcl, values, where, ret)
         self._do_debug(q)
-        res = self.db.query(q, params)
+        res = self.db.query(q)
         if ret:
             res = res.dictresult()[0]
             for att, value in res.iteritems():
@@ -891,8 +883,7 @@
             d = {}
         d.update(kw)
         if qoid in d:
-            where = 'oid = $1'
-            params = (d[qoid],)
+            where = 'oid = %s' % d[qoid]
         else:
             try:
                 keyname = self.pkey(qcl)
@@ -900,15 +891,15 @@
                 raise _prg_error('Class %s has no primary key' % qcl)
             if isinstance(keyname, basestring):
                 keyname = (keyname,)
+            attnames = self.get_attnames(qcl)
             try:
-                where = ' AND '.join(['%s = $%d'
-                    % (k, i + 1) for i, k in enumerate(keyname)])
-                params = tuple(d[k] for k in keyname)
+                where = ' AND '.join(['%s = %s'
+                    % (k, self._quote(d[k], attnames[k])) for k in keyname])
             except KeyError:
                 raise _prg_error('Delete needs primary key or oid.')
         q = 'DELETE FROM %s WHERE %s' % (qcl, where)
-        self._do_debug(q + " %% %r" % (params,))
-        return int(self.db.query(q, params))
+        self._do_debug(q)
+        return int(self.db.query(q))
 
     def pgnotify(self, event, callback, arg_dict={}, timeout=None):
         return pgnotify(self.db, event, callback, arg_dict, timeout)

Modified: trunk/module/test_pg.py
==============================================================================
--- trunk/module/test_pg.py     Sat Jan  5 15:25:40 2013        (r492)
+++ trunk/module/test_pg.py     Sat Jan  5 17:08:30 2013        (r493)
@@ -22,7 +22,7 @@
   * Table privilege problems (e.g. insert but no select) are not tested.
   * Status and error messages from the connection are not tested.
   * It would be more reasonable to create a test for the underlying
-    shared library functions in the _pg module and assume they are ok.vi
+    shared library functions in the _pg module and assume they are ok.
     The pg and pgdb modules should be tested against _pg mock functions.
 
 """
@@ -1184,6 +1184,7 @@
         dbname = DBTestSuite.dbname
         self.dbname = dbname
         self.db = pg.DB(dbname)
+        self.db.query("set lc_monetary='C'");
         if debug:
             self.db.debug = 'DEBUG: %s'
 
@@ -1276,14 +1277,14 @@
         self.assertEqual(f('123456789', 'num'), '123456789')
         self.assertEqual(f('1.23456789', 'num'), '1.23456789')
         self.assertEqual(f('12345678.9', 'num'), '12345678.9')
-        self.assertEqual(f(123, 'money'), "'123.00'")
-        self.assertEqual(f('123', 'money'), "'123.00'")
-        self.assertEqual(f(123.45, 'money'), "'123.45'")
-        self.assertEqual(f('123.45', 'money'), "'123.45'")
-        self.assertEqual(f(123.454, 'money'), "'123.45'")
-        self.assertEqual(f('123.454', 'money'), "'123.45'")
-        self.assertEqual(f(123.456, 'money'), "'123.46'")
-        self.assertEqual(f('123.456', 'money'), "'123.46'")
+        self.assertEqual(f(123, 'money'), '123')
+        self.assertEqual(f('123', 'money'), '123')
+        self.assertEqual(f(123.45, 'money'), '123.45')
+        self.assertEqual(f('123.45', 'money'), '123.45')
+        self.assertEqual(f(123.454, 'money'), '123.454')
+        self.assertEqual(f('123.454', 'money'), '123.454')
+        self.assertEqual(f(123.456, 'money'), '123.456')
+        self.assertEqual(f('123.456', 'money'), '123.456')
         self.assertEqual(f('f', 'bool'), "'f'")
         self.assertEqual(f('F', 'bool'), "'f'")
         self.assertEqual(f('false', 'bool'), "'f'")
@@ -1579,27 +1580,80 @@
             smart_ddl(self.db, 'drop table "%s"' % table)
             smart_ddl(self.db, 'create table "%s" ('
                 "i2 smallint, i4 integer, i8 bigint,"
-                "d numeric, f4 real, f8 double precision, m money, "
-                "v4 varchar(4), c4 char(4), t text,"
-                "b boolean, ts timestamp)" % table)
-            data = dict(
-                i2=2 ** 15 - 1, i4=int(2 ** 31 - 1), i8=long(2 ** 31 - 1),
-                d=Decimal('123456789.9876543212345678987654321'),
-                f4=1.0 + 1.0 / 32, f8=1.0 + 1.0 / 32,
-                m="1234.56", v4="1234", c4="1234",  t="1234" * 10,
-                b=1, ts='2012-12-21')
-            r = self.db.insert(table, data)
-            self.assertEqual(r, data)
+                " d numeric, f4 real, f8 double precision, m money,"
+                " v4 varchar(4), c4 char(4), t text,"
+                " b boolean, ts timestamp)" % table)
             oid_table = table
             if ' ' in table:
                 oid_table = '"%s"' % oid_table
             oid_table = 'oid(public.%s)' % oid_table
-            self.assertTrue(oid_table in r)
-            self.assertTrue(isinstance(r[oid_table], int))
-            s = self.db.query('select oid,* from "%s"' % table).dictresult()[0]
-            s[oid_table] = s['oid']
-            del s['oid']
-            self.assertEqual(r, s)
+            tests = [dict(i2=None, i4=None, i8=None),
+                (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
+                (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
+                dict(i2=42, i4=123456, i8=9876543210),
+                dict(i2=2 ** 15 - 1,
+                    i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
+                dict(d=None), (dict(d=''), dict(d=None)),
+                dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
+                dict(f4=None, f8=None), dict(f4=0, f8=0),
+                (dict(f4='', f8=''), dict(f4=None, f8=None)),
+                dict(d=1234.5, f4=1234.5, f8=1234.5),
+                dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
+                dict(d=Decimal('123456789.9876543212345678987654321')),
+                dict(m=None), (dict(m=''), dict(m=None)),
+                dict(m=Decimal('-1234.56')),
+                (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
+                dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
+                (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
+                (dict(m=1234.5), dict(m=Decimal('1234.5'))),
+                (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
+                (dict(m=123456), dict(m=Decimal('123456'))),
+                (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
+                dict(b=None), (dict(b=''), dict(b=None)),
+                dict(b='f'), dict(b='t'),
+                (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
+                (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
+                (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
+                (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
+                (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
+                (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
+                dict(v4=None, c4=None, t=None),
+                (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
+                dict(v4='1234', c4='1234', t='1234' * 10),
+                dict(v4='abcd', c4='abcd', t='abcdefg'),
+                (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
+                dict(ts=None), (dict(ts=''), dict(ts=None)),
+                (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
+                dict(ts='2012-12-21 00:00:00'),
+                (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
+                dict(ts='2012-12-21 12:21:12'),
+                dict(ts='2013-01-05 12:13:14'),
+                dict(ts='current_timestamp')]
+            for test in tests:
+                if isinstance(test, dict):
+                    data = test
+                    change = {}
+                else:
+                    data, change = test
+                expect = data.copy()
+                expect.update(change)
+                self.assertEqual(self.db.insert(table, data), data)
+                self.assertTrue(oid_table in data)
+                oid = data[oid_table]
+                self.assertTrue(isinstance(oid, int))
+                data = dict(item for item in data.iteritems()
+                    if item[0] in expect)
+                ts = expect.get('ts')
+                if ts == 'current_timestamp':
+                    expect['ts'] = data['ts']
+                self.assertEqual(data, expect)
+                data = self.db.query(
+                    'select oid,* from "%s"' % table).dictresult()[0]
+                self.assertEqual(data['oid'], oid)
+                data = dict(item for item in data.iteritems()
+                    if item[0] in expect)
+                self.assertEqual(data, expect)
+                self.db.query('delete from "%s"' % table)
 
     def testUpdate(self):
         for table in ('update_test_table', 'test table for update'):
@@ -1916,6 +1970,7 @@
         for s in ('client_min_messages = warning',
             'client_encoding = UTF8',
             'date_style = ISO, MDY',
+            'lc_monetary = C',
             'default_with_oids = on',
             'standard_conforming_strings = off',
             'escape_string_warning = off'):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to