Author: cito
Date: Tue May 8 06:36:04 2012
New Revision: 436
Log:
Some clean-up, mostly PEP8 issues.
Modified:
trunk/docs/pg.txt
trunk/module/TEST_PyGreSQL_classic.py
trunk/module/TEST_PyGreSQL_dbapi20.py
trunk/module/pg.py
trunk/module/pgdb.py
trunk/module/pgmodule.c
trunk/module/test_pg.py
Modified: trunk/docs/pg.txt
==============================================================================
--- trunk/docs/pg.txt Tue May 8 05:32:01 2012 (r435)
+++ trunk/docs/pg.txt Tue May 8 06:36:04 2012 (r436)
@@ -433,6 +433,7 @@
Exceptions raised:
:TypeError: bad argument type, or too many arguments
+ :TypeError: invalid connection
:ValueError: empty SQL query or lost connection
:pg.ProgrammingError: error in query
:pg.InternalError: error during query processing
@@ -466,6 +467,7 @@
Exceptions raised:
:TypeError: too many (any) arguments
+ :TypeError: invalid connection
Description:
This method resets the current database connection.
@@ -484,6 +486,7 @@
Exceptions raised:
:TypeError: too many (any) arguments
+ :TypeError: invalid connection
Description:
This method requests that the server abandon processing
@@ -521,6 +524,7 @@
Exceptions raised:
:TypeError: too many (any) arguments
+ :TypeError: invalid connection
Description:
This method returns the underlying socket id used to connect
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Tue May 8 05:32:01 2012
(r435)
+++ trunk/module/TEST_PyGreSQL_classic.py Tue May 8 06:36:04 2012
(r436)
@@ -1,7 +1,6 @@
#!/usr/bin/env python
-import sys, unittest
-from decimal import Decimal
+import unittest
from pg import *
# We need a database to test against. If LOCAL_PyGreSQL.py exists we will
@@ -29,22 +28,22 @@
for t in ('_test1', '_test2'):
try:
db.query("CREATE SCHEMA " + t)
- except Exception:
+ except Error:
pass
try:
db.query("CREATE TABLE %s._test_schema "
"(%s int PRIMARY KEY)" % (t, t))
- except Exception:
+ except Error:
db.query("DELETE FROM %s._test_schema" % t)
try:
db.query("CREATE TABLE _test_schema "
"(_test int PRIMARY KEY, _i interval, dvar int DEFAULT 999)")
- except Exception:
+ except Error:
db.query("DELETE FROM _test_schema")
try:
db.query("CREATE VIEW _test_vschema AS "
"SELECT _test, 'abc'::text AS _test2 FROM _test_schema")
- except Exception:
+ except Error:
pass
def test_invalidname(self):
@@ -109,7 +108,7 @@
def test_mixed_case(self):
try:
db.query('CREATE TABLE _test_mc ("_Test" int PRIMARY KEY)')
- except Exception:
+ except Error:
db.query("DELETE FROM _test_mc")
d = dict(_Test=1234)
db.insert('_test_mc', d)
Modified: trunk/module/TEST_PyGreSQL_dbapi20.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_dbapi20.py Tue May 8 05:32:01 2012
(r435)
+++ trunk/module/TEST_PyGreSQL_dbapi20.py Tue May 8 06:36:04 2012
(r436)
@@ -1,8 +1,8 @@
#!/usr/bin/env python
# $Id$
-import dbapi20
import unittest
+import dbapi20
import pgdb
# We need a database to test against.
@@ -23,7 +23,7 @@
connect_args = ()
connect_kw_args = {'dsn': dbhost + ':' + dbname}
- lower_func = 'lower' # For stored procedure test
+ lower_func = 'lower' # For stored procedure test
def setUp(self):
# Call superclass setUp in case this does something in the future
@@ -31,7 +31,7 @@
try:
con = self._connect()
con.close()
- except Exception:
+ except pgdb.Error:
import pg
pg.DB().query('create database ' + dbname)
@@ -107,23 +107,22 @@
self.assertEqual(error.sqlstate, '22012')
def test_float(self):
- from math import pi, e
try:
nan = float('nan')
- except ValueError: # Python < 2.6
+ except ValueError: # Python < 2.6
nan = 3.0e999 - 1.5e999999
try:
inf = float('inf')
- except ValueError: # Python < 2.6
+ except ValueError: # Python < 2.6
inf = 3.0e999 * 1.5e999999
try:
from math import isnan, isinf
- except ImportError: # Python < 2.6
+ except ImportError: # Python < 2.6
isnan = lambda x: x != x
isinf = lambda x: not isnan(x) and isnan(x * 0)
try:
from math import isnan, isinf
- except ImportError: # Python < 2.6
+ except ImportError: # Python < 2.6
isnan = lambda x: x != x
isinf = lambda x: not isnan(x) and isnan(x * 0)
self.assert_(isnan(nan) and not isinf(nan))
@@ -176,10 +175,10 @@
self.assert_(pgdb.decimal_type() is decimal_type)
def test_nextset(self):
- pass # not implemented
+ pass # not implemented
def test_setoutputsize(self):
- pass # not implemented
+ pass # not implemented
def test_connection_errors(self):
con = self._connect()
Modified: trunk/module/pg.py
==============================================================================
--- trunk/module/pg.py Tue May 8 05:32:01 2012 (r435)
+++ trunk/module/pg.py Tue May 8 06:36:04 2012 (r436)
@@ -21,13 +21,13 @@
from _pg import *
try:
frozenset
-except NameError: # Python < 2.4
+except NameError: # Python < 2.4
from sets import ImmutableSet as frozenset
try:
from decimal import Decimal
set_decimal(Decimal)
except ImportError:
- pass # Python < 2.4
+ pass # Python < 2.4
# Auxiliary functions which are independent from a DB connection:
@@ -37,11 +37,13 @@
s = s.replace('_', 'a')
return not s.isalnum() or s[:1].isdigit() or s != s.lower()
+
def _is_unquoted(s):
"""Check whether this string is an unquoted identifier."""
s = s.replace('_', 'a')
return s.isalnum() and not s[:1].isdigit()
+
def _split_first_part(s):
"""Split the first part of a dot separated string."""
s = s.lstrip()
@@ -71,6 +73,7 @@
p[0] = s
return p
+
def _split_parts(s):
"""Split all parts of a dot separated string."""
q = []
@@ -82,24 +85,29 @@
s = s[1]
return q
+
def _join_parts(s):
"""Join all parts of a dot separated string."""
return '.'.join([_is_quoted(p) and '"%s"' % p or p for p in s])
+
def _oid_key(qcl):
"""Build oid key from qualified class name."""
return 'oid(%s)' % qcl
+
def _db_error(msg, cls=DatabaseError):
"""Returns DatabaseError with empty sqlstate attribute."""
error = cls(msg)
error.sqlstate = None
return error
+
def _int_error(msg):
"""Returns InternalError."""
return _db_error(msg, InternalError)
+
def _prg_error(msg):
"""Returns ProgrammingError."""
return _db_error(msg, ProgrammingError)
@@ -143,13 +151,13 @@
self._pkeys = {}
self._privileges = {}
self._args = args, kw
- self.debug = None # For debugging scripts, this can be set
+ self.debug = None # For debugging scripts, this can be set
# * to a string format specification (e.g. in CGI set to "%s<BR>"),
# * to a file object to write debug statements or
# * to a callable object which takes a string argument.
def __getattr__(self, name):
- # All undefined members are the same as in the underlying pg
connection:
+ # All undefined members are same as in underlying pg connection:
if self.db:
return getattr(self.db, name)
else:
@@ -161,7 +169,7 @@
"""Print a debug message."""
if self.debug:
if isinstance(self.debug, basestring):
- sys.stdout.write((self.debug % s) + '\n')
+ print self.debug % s
elif isinstance(self.debug, file):
file.write(s + '\n')
elif callable(self.debug):
@@ -208,7 +216,7 @@
return 'NULL'
return "'%.2f'" % float(d)
- _quote_funcs = dict( # quote methods for each type
+ _quote_funcs = dict( # quote methods for each type
text=_quote_text, bool=_quote_bool, date=_quote_date,
int=_quote_num, num=_quote_num, float=_quote_num,
money=_quote_money)
@@ -233,7 +241,7 @@
"""
s = _split_parts(cl)
- if len(s) > 1: # name already qualfied?
+ if len(s) > 1: # name already qualfied?
# should be database.schema.table or schema.table
if len(s) > 3:
raise _prg_error('Too many dots in class name %s' % cl)
@@ -243,22 +251,23 @@
# determine search path
q = 'SELECT current_schemas(TRUE)'
schemas = self.db.query(q).getresult()[0][0][1:-1].split(',')
- if schemas: # non-empty path
+ if schemas: # non-empty path
# search schema for this object in the current search path
q = ' UNION '.join(
["SELECT %d::integer AS n, '%s'::name AS nspname"
% s for s in enumerate(schemas)])
q = ("SELECT nspname FROM pg_class"
- " JOIN pg_namespace ON pg_class.relnamespace =
pg_namespace.oid"
+ " JOIN pg_namespace"
+ " ON pg_class.relnamespace = pg_namespace.oid"
" JOIN (%s) AS p USING (nspname)"
" WHERE pg_class.relname = '%s'"
" ORDER BY n LIMIT 1" % (q, cl))
schema = self.db.query(q).getresult()
- if schema: # schema found
+ if schema: # schema found
schema = schema[0][0]
- else: # object not found in current search path
+ else: # object not found in current search path
schema = 'public'
- else: # empty path
+ else: # empty path
schema = 'public'
return schema, cl
@@ -289,7 +298,10 @@
will not be usable after this call.
"""
- self.db.reset()
+ if self.db:
+ self.db.reset()
+ else:
+ raise _int_error('Connection already closed')
def reopen(self):
"""Reopen connection to the database.
@@ -345,7 +357,7 @@
for cl, pkey in newpkey.iteritems()])
return self._pkeys
- qcl = self._add_schema(cl) # build fully qualified class name
+ qcl = self._add_schema(cl) # build fully qualified class name
# Check if the caller is supplying a new primary key for the class
if newpkey:
self._pkeys[qcl] = newpkey
@@ -364,7 +376,8 @@
for r in self.db.query(
"SELECT pg_namespace.nspname, pg_class.relname,"
" pg_attribute.attname FROM pg_class"
- " JOIN pg_namespace ON pg_namespace.oid =
pg_class.relnamespace"
+ " JOIN pg_namespace"
+ " ON pg_namespace.oid = pg_class.relnamespace"
" AND pg_namespace.nspname NOT LIKE 'pg_%'"
" JOIN pg_attribute ON pg_attribute.attrelid = pg_class.oid"
" AND pg_attribute.attisdropped = 'f'"
@@ -422,8 +435,8 @@
return
elif newattnames:
raise _prg_error('If supplied, newattnames must be a dictionary')
- cl = self._split_schema(cl) # split into schema and class
- qcl = _join_parts(cl) # build fully qualified name
+ cl = self._split_schema(cl) # split into schema and class
+ qcl = _join_parts(cl) # build fully qualified name
# May as well cache them:
if qcl in self._attnames:
return self._attnames[qcl]
@@ -461,7 +474,7 @@
t[att] = 'money'
else:
t[att] = 'text'
- self._attnames[qcl] = t # cache it
+ self._attnames[qcl] = t # cache it
return self._attnames[qcl]
def has_table_privilege(self, cl, privilege='select'):
@@ -490,8 +503,8 @@
munged as oid(schema.table).
"""
- if cl.endswith('*'): # scan descendant tables?
- cl = cl[:-1].rstrip() # need parent table name
+ if cl.endswith('*'): # scan descendant tables?
+ cl = cl[:-1].rstrip() # need parent table name
# build qualified class name
qcl = self._add_schema(cl)
# To allow users to work with multiple tables,
@@ -581,7 +594,7 @@
try:
self.get(qcl, d)
except ProgrammingError:
- pass # table has no primary key
+ pass # table has no primary key
return d
def update(self, cl, d=None, **kw):
@@ -649,9 +662,8 @@
return d
def clear(self, cl, a=None):
- """
+ """Clear all the attributes to values determined by the types.
- This method clears all the attributes to values determined by the
types.
Numeric types are set to 0, Booleans are set to 'f', and everything
else is set to the empty string. If the array argument is present,
it is used as the array and any entries matching attribute names are
@@ -661,7 +673,7 @@
# At some point we will need a way to get defaults from a table.
qcl = self._add_schema(cl)
if a is None:
- a = {} # empty if argument is not present
+ a = {} # empty if argument is not present
attnames = self.get_attnames(qcl)
for n, t in attnames.iteritems():
if n == 'oid':
Modified: trunk/module/pgdb.py
==============================================================================
--- trunk/module/pgdb.py Tue May 8 05:32:01 2012 (r435)
+++ trunk/module/pgdb.py Tue May 8 06:36:04 2012 (r436)
@@ -64,27 +64,26 @@
"""
from _pg import *
-import sys
try:
frozenset
-except NameError: # Python < 2.4
+except NameError: # Python < 2.4
from sets import ImmutableSet as frozenset
from datetime import date, time, datetime, timedelta
from time import localtime
-try: # use Decimal if available
+try: # use Decimal if available
from decimal import Decimal
set_decimal(Decimal)
-except ImportError: # otherwise (Python < 2.4)
- Decimal = float # use float instead of Decimal
+except ImportError: # otherwise (Python < 2.4)
+ Decimal = float # use float instead of Decimal
try:
from math import isnan, isinf
-except ImportError: # Python < 2.6
+except ImportError: # Python < 2.6
isnan = lambda x: x != x
isinf = lambda x: not isnan(x) and isnan(x * 0)
try:
inf = float('inf')
nan = float('nan')
-except ValueError: # Python < 2.6
+except ValueError: # Python < 2.6
inf = 1.0e999
nan = inf * 0
@@ -334,7 +333,7 @@
else:
sql = operation
rows = self._src.execute(sql)
- if rows: # true if not DML
+ if rows: # true if not DML
totrows += rows
else:
self.rowcount = -1
@@ -349,7 +348,8 @@
self.rowcount = self._src.ntuples
getdescr = self._type_cache.getdescr
coltypes = self._src.listinfo()
- self.description = [typ[1:2] + getdescr(typ[2]) for typ in
coltypes]
+ self.description = [
+ typ[1:2] + getdescr(typ[2]) for typ in coltypes]
self.lastrowid = None
else:
self.rowcount = totrows
@@ -438,8 +438,8 @@
def __init__(self, cnx):
"""Create a database connection object."""
- self._cnx = cnx # connection
- self._tnx = False # transaction state
+ self._cnx = cnx # connection
+ self._tnx = False # transaction state
self._type_cache = pgdbTypeCache(cnx)
try:
self._cnx.source()
@@ -562,7 +562,7 @@
if isinstance(values, basestring):
values = values.split()
return super(pgdbType, cls).__new__(cls, values)
- else: # Python < 2.4
+ else: # Python < 2.4
def __init__(self, values):
if isinstance(values, basestring):
values = values.split()
@@ -612,26 +612,32 @@
"""Construct an object holding a date value."""
return date(year, month, day)
+
def Time(hour, minute=0, second=0, microsecond=0):
"""Construct an object holding a time value."""
return time(hour, minute, second, microsecond)
+
def Timestamp(year, month, day, hour=0, minute=0, second=0, microsecond=0):
"""construct an object holding a time stamp value."""
return datetime(year, month, day, hour, minute, second, microsecond)
+
def DateFromTicks(ticks):
"""Construct an object holding a date value from the given ticks value."""
return Date(*localtime(ticks)[:3])
+
def TimeFromTicks(ticks):
"""construct an object holding a time value from the given ticks value."""
return Time(*localtime(ticks)[3:6])
+
def TimestampFromTicks(ticks):
"""construct an object holding a time stamp from the given ticks value."""
return Timestamp(*localtime(ticks)[:6])
+
class Binary(str):
"""construct an object capable of holding a binary (long) string value."""
Modified: trunk/module/pgmodule.c
==============================================================================
--- trunk/module/pgmodule.c Tue May 8 05:32:01 2012 (r435)
+++ trunk/module/pgmodule.c Tue May 8 06:36:04 2012 (r436)
@@ -140,7 +140,7 @@
{
pgobject *pgobj;
- if ((pgobj = PyObject_NEW(pgobject, &PgType)) == NULL)
+ if (!(pgobj = PyObject_NEW(pgobject, &PgType)))
return NULL;
pgobj->valid = 1;
@@ -225,14 +225,13 @@
PyObject *err = NULL;
PyObject *str;
- str = PyString_FromString(msg);
- if (str)
+ if (!(str = PyString_FromString(msg)))
+ err = NULL;
+ else
{
err = PyObject_CallFunctionObjArgs(type, str, NULL);
Py_DECREF(str);
}
- else
- err = NULL;
if (err)
{
if (result) {
@@ -314,7 +313,7 @@
return 0;
}
- if ((level & CHECK_RESULT) && self->result == NULL)
+ if ((level & CHECK_RESULT) && !self->result)
{
set_dberror(DatabaseError, "no result.", NULL);
return 0;
@@ -340,7 +339,7 @@
int *typ;
int j;
- if ((typ = malloc(sizeof(int) * nfields)) == NULL)
+ if (!(typ = malloc(sizeof(int) * nfields)))
{
PyErr_SetString(PyExc_MemoryError, "memory error in
getresult().");
return NULL;
@@ -564,7 +563,7 @@
pgsourceobject *npgobj;
/* allocates new query object */
- if ((npgobj = PyObject_NEW(pgsourceobject, &PgSourceType)) == NULL)
+ if (!(npgobj = PyObject_NEW(pgsourceobject, &PgSourceType)))
return NULL;
/* initializes internal parameters */
@@ -731,7 +730,7 @@
return NULL;
/* checks args */
- if ((args != NULL) && (!PyArg_ParseTuple(args, "")))
+ if (args && !PyArg_ParseTuple(args, ""))
{
PyErr_SetString(PyExc_TypeError,
"method oidstatus() takes no parameters.");
@@ -783,13 +782,13 @@
size = self->max_row - self->current_row;
/* allocate list for result */
- if ((reslist = PyList_New(0)) == NULL)
+ if (!(reslist = PyList_New(0)))
return NULL;
/* builds result */
for (i = 0; i < size; ++i)
{
- if ((rowtuple = PyTuple_New(self->num_fields)) == NULL)
+ if (!(rowtuple = PyTuple_New(self->num_fields)))
{
Py_DECREF(reslist);
return NULL;
@@ -974,7 +973,7 @@
}
/* builds result */
- if ((result = PyTuple_New(self->num_fields)) == NULL)
+ if (!(result = PyTuple_New(self->num_fields)))
return NULL;
for (i = 0; i < self->num_fields; i++)
@@ -1211,7 +1210,7 @@
{
pglargeobject *npglo;
- if ((npglo = PyObject_NEW(pglargeobject, &PglargeType)) == NULL)
+ if (!(npglo = PyObject_NEW(pglargeobject, &PglargeType)))
return NULL;
Py_XINCREF(pgcnx);
@@ -1736,7 +1735,7 @@
pgpasswd = PyString_AsString(pg_default_passwd);
#endif /* DEFAULT_VARS */
- if ((npgobj = (pgobject *) pgobject_New()) == NULL)
+ if (!(npgobj = (pgobject *) pgobject_New()))
return NULL;
if (pgport != -1)
@@ -2109,7 +2108,7 @@
*typ;
/* checks args (args == NULL for an internal call) */
- if ((args != NULL) && (!PyArg_ParseTuple(args, "")))
+ if (args && !PyArg_ParseTuple(args, ""))
{
PyErr_SetString(PyExc_TypeError,
"method getresult() takes no parameters.");
@@ -2125,7 +2124,7 @@
for (i = 0; i < m; i++)
{
- if ((rowtuple = PyTuple_New(n)) == NULL)
+ if (!(rowtuple = PyTuple_New(n)))
{
Py_DECREF(reslist);
reslist = NULL;
@@ -2194,7 +2193,7 @@
break;
}
- if (val == NULL)
+ if (!val)
{
Py_DECREF(reslist);
Py_DECREF(rowtuple);
@@ -2234,7 +2233,7 @@
*typ;
/* checks args (args == NULL for an internal call) */
- if ((args != NULL) && (!PyArg_ParseTuple(args, "")))
+ if (args && !PyArg_ParseTuple(args, ""))
{
PyErr_SetString(PyExc_TypeError,
"method getresult() takes no parameters.");
@@ -2250,7 +2249,7 @@
for (i = 0; i < m; i++)
{
- if ((dict = PyDict_New()) == NULL)
+ if (!(dict = PyDict_New()))
{
Py_DECREF(reslist);
reslist = NULL;
@@ -2319,7 +2318,7 @@
break;
}
- if (val == NULL)
+ if (!val)
{
Py_DECREF(dict);
Py_DECREF(reslist);
@@ -2367,7 +2366,7 @@
/* checks for NOTIFY messages */
PQconsumeInput(self->cnx);
- if ((notify = PQnotifies(self->cnx)) == NULL)
+ if (!(notify = PQnotifies(self->cnx)))
{
Py_INCREF(Py_None);
return Py_None;
@@ -2377,15 +2376,15 @@
PyObject *notify_result,
*temp;
- if ((notify_result = PyTuple_New(2)) == NULL ||
- (temp = PyString_FromString(notify->relname)) == NULL)
+ if (!(notify_result = PyTuple_New(2)) ||
+ !(temp = PyString_FromString(notify->relname)))
{
return NULL;
}
PyTuple_SET_ITEM(notify_result, 0, temp);
- if ((temp = PyInt_FromLong(notify->be_pid)) == NULL)
+ if (!(temp = PyInt_FromLong(notify->be_pid)))
{
Py_DECREF(notify_result);
return NULL;
@@ -2505,7 +2504,7 @@
return NULL; /* error detected on query */
}
- if ((npgobj = PyObject_NEW(pgqueryobject, &PgQueryType)) == NULL)
+ if (!(npgobj = PyObject_NEW(pgqueryobject, &PgQueryType)))
return NULL;
/* stores result and returns object */
Modified: trunk/module/test_pg.py
==============================================================================
--- trunk/module/test_pg.py Tue May 8 05:32:01 2012 (r435)
+++ trunk/module/test_pg.py Tue May 8 06:36:04 2012 (r436)
@@ -39,17 +39,17 @@
try:
locale.setlocale(locale.LC_ALL, 'german')
except Exception:
- import warning
- warning.warn('Cannot set German locale.')
+ import warnings
+ warnings.warn('Cannot set German locale.')
german = False
try:
frozenset
-except NameError: # Python < 2.4
+except NameError: # Python < 2.4
from sets import ImmutableSet as frozenset
try:
from decimal import Decimal
-except ImportError: # Python < 2.4
+except ImportError: # Python < 2.4
Decimal = float
@@ -338,11 +338,11 @@
dbname = 'template1'
try:
connection = pg.connect(dbname)
- except Exception:
+ except pg.Error:
self.fail('Cannot connect to database ' + dbname)
try:
connection.close()
- except Exception:
+ except pg.Error:
self.fail('Cannot close the database connection')
@@ -414,8 +414,8 @@
def testAttributeUser(self):
no_user = 'Deprecated facility'
user = self.connection.user
- self.assert_(self.connection.user)
- self.assertNotEqual(self.connection.user, no_user)
+ self.assert_(user)
+ self.assertNotEqual(user, no_user)
def testMethodQuery(self):
self.connection.query("select 1+1")
@@ -430,15 +430,17 @@
self.connection.close()
try:
self.connection.reset()
- self.fail('Reset should give an error for a closed connection')
- except Exception:
+ except (pg.Error, TypeError):
pass
+ else:
+ self.fail('Reset should give an error for a closed connection')
self.assertRaises(pg.InternalError, self.connection.close)
try:
self.connection.query('select 1')
- self.fail('Query should give an error for a closed connection')
- except Exception:
+ except (pg.Error, TypeError):
pass
+ else:
+ self.fail('Query should give an error for a closed connection')
self.connection = pg.connect(self.dbname)
@@ -591,7 +593,7 @@
r = self.c.query(q)
t = '~test_pg_testPrint_temp.tmp'
s = open(t, 'w')
- import sys, os
+ import os, sys
stdout, sys.stdout = sys.stdout, s
try:
print r
@@ -669,8 +671,7 @@
self.assertEqual(r, num_rows)
def testInserttableNullValues(self):
- num_rows = 100
- data = [(None,) * 10]
+ data = [(None,) * 10] * 10
self.c.inserttable("test", data)
r = self.c.query("select * from test").getresult()
self.assertEqual(r, data)
@@ -925,9 +926,10 @@
self.db.close()
try:
self.db.reset()
- self.fail('Reset should give an error for a closed connection')
- except Exception:
+ except pg.Error:
pass
+ else:
+ self.fail('Reset should give an error for a closed connection')
self.assertRaises(pg.InternalError, self.db.close)
self.assertRaises(pg.InternalError, self.db.query, 'select 1')
self.db = pg.DB(self.dbname)
@@ -1104,7 +1106,7 @@
self.assert_(r is None)
q = "insert into test_table values (1)"
r = self.db.query(q)
- self.assert_(isinstance(r, int)), r
+ self.assert_(isinstance(r, int))
q = "insert into test_table select 2"
r = self.db.query(q)
self.assert_(isinstance(r, int))
@@ -1230,7 +1232,7 @@
'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
'normal_name': 'int', 'Special Name': 'int',
'u': 'text', 't': 'text', 'v': 'text',
- 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int' }
+ 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'}
self.assertEqual(attributes, result)
def testHasTablePrivilege(self):
@@ -1300,7 +1302,7 @@
"n integer, m integer, t text, primary key (n, m))" % table)
for n in range(3):
for m in range(2):
- t = chr(ord('a') + 2*n +m)
+ t = chr(ord('a') + 2*n + m)
self.db.query("insert into %s values("
"%d, %d, '%s')" % (table, n+1, m+1, t))
self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
@@ -1403,7 +1405,7 @@
self.assertEqual(r, result)
r['a'] = r['n'] = 1
r['d'] = r['t'] = 'x'
- r['b']
+ r['b'] = 't'
r['oid'] = 1L
r = self.db.clear(table, r)
result = {'a': 1, 'n': 0, 'b': 'f', 'd': '', 't': '', 'oid': 1L}
@@ -1539,21 +1541,22 @@
self.assertEqual(r, result_m)
def testGet(self):
+ PrgError = pg.ProgrammingError
self.assertEqual(self.db.get("t", 1, 'n')['d'], 0)
self.assertEqual(self.db.get("t0", 1, 'n')['d'], 0)
self.assertEqual(self.db.get("public.t", 1, 'n')['d'], 0)
self.assertEqual(self.db.get("public.t0", 1, 'n')['d'], 0)
- self.assertRaises(pg.ProgrammingError, self.db.get, "public.t1", 1,
'n')
+ self.assertRaises(PrgError, self.db.get, "public.t1", 1, 'n')
self.assertEqual(self.db.get("s1.t1", 1, 'n')['d'], 1)
self.assertEqual(self.db.get("s3.t", 1, 'n')['d'], 3)
self.db.query("set search_path to s2,s4")
- self.assertRaises(pg.ProgrammingError, self.db.get, "t1", 1, 'n')
+ self.assertRaises(PrgError, self.db.get, "t1", 1, 'n')
self.assertEqual(self.db.get("t4", 1, 'n')['d'], 4)
self.assertRaises(pg.ProgrammingError, self.db.get, "t3", 1, 'n')
self.assertEqual(self.db.get("t", 1, 'n')['d'], 2)
self.assertEqual(self.db.get("s3.t3", 1, 'n')['d'], 3)
self.db.query("set search_path to s1,s3")
- self.assertRaises(pg.ProgrammingError, self.db.get, "t2", 1, 'n')
+ self.assertRaises(PrgError, self.db.get, "t2", 1, 'n')
self.assertEqual(self.db.get("t3", 1, 'n')['d'], 3)
self.assertRaises(pg.ProgrammingError, self.db.get, "t4", 1, 'n')
self.assertEqual(self.db.get("t", 1, 'n')['d'], 1)
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql