Author: cito
Date: Thu Jul 14 08:21:19 2016
New Revision: 876
Log:
Fixed issues when remote server version < 9.0
Though this is not officially supported and tested,
sometimes you just have to access those legacy databases.
Modified:
trunk/pg.py
trunk/pgdb.py
trunk/tests/test_classic.py
trunk/tests/test_classic_connection.py
trunk/tests/test_classic_dbwrapper.py
Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Sat Jul 2 04:46:03 2016 (r875)
+++ trunk/pg.py Thu Jul 14 08:21:19 2016 (r876)
@@ -903,7 +903,7 @@
def __missing__(self, typ):
"""Create a cast function if it is not cached.
-
+
Note that this class never raises a KeyError,
but returns None when no special cast function exists.
"""
@@ -1049,7 +1049,7 @@
regtype: the regular type name
simple: the simple PyGreSQL type name
typtype: b = base type, c = composite type etc.
- category: A = Array, b =Boolean, C = Composite etc.
+ category: A = Array, b = Boolean, C = Composite etc.
delim: delimiter for array types
relid: corresponding table for composite types
attnames: attributes for composite types
@@ -1082,6 +1082,17 @@
db = db.db
self.query = db.query
self.escape_string = db.escape_string
+ if db.server_version < 80400:
+ # older remote databases (not officially supported)
+ self._query_pg_type = (
+ "SELECT oid, typname, typname::text::regtype,"
+ " typtype, null as typcategory, typdelim, typrelid"
+ " FROM pg_type WHERE oid=%s::regtype")
+ else:
+ self._query_pg_type = (
+ "SELECT oid, typname, typname::regtype,"
+ " typtype, typcategory, typdelim, typrelid"
+ " FROM pg_type WHERE oid=%s::regtype")
def add(self, oid, pgtype, regtype,
typtype, category, delim, relid):
@@ -1104,10 +1115,8 @@
def __missing__(self, key):
"""Get the type info from the database if it is not cached."""
try:
- res = self.query("SELECT oid, typname, typname::regtype,"
- " typtype, typcategory, typdelim, typrelid"
- " FROM pg_type WHERE oid=%s::regtype" %
- (_quote_if_unqualified('$1', key),), (key,)).getresult()
+ q = self._query_pg_type % (_quote_if_unqualified('$1', key),)
+ res = self.query(q, (key,)).getresult()
except ProgrammingError:
res = None
if not res:
@@ -1379,6 +1388,23 @@
self._args = args, kw
self.adapter = Adapter(self)
self.dbtypes = DbTypes(self)
+ if db.server_version < 80400:
+ # support older remote data bases
+ self._query_attnames = (
+ "SELECT a.attname, t.oid, t.typname, t.typname::text::regtype,"
+ " t.typtype, null as typcategory, t.typdelim, t.typrelid"
+ " FROM pg_attribute a"
+ " JOIN pg_type t ON t.oid = a.atttypid"
+ " WHERE a.attrelid = %s::regclass AND %s"
+ " AND NOT a.attisdropped ORDER BY a.attnum")
+ else:
+ self._query_attnames = (
+ "SELECT a.attname, t.oid, t.typname, t.typname::regtype,"
+ " t.typtype, t.typcategory, t.typdelim, t.typrelid"
+ " FROM pg_attribute a"
+ " JOIN pg_type t ON t.oid = a.atttypid"
+ " WHERE a.attrelid = %s::regclass AND %s"
+ " AND NOT a.attisdropped ORDER BY a.attnum")
db.set_cast_hook(self.dbtypes.typecast)
self.debug = None # For debugging scripts, this can be set
# * to a string format specification (e.g. in CGI set to "%s<BR>"),
@@ -1805,13 +1831,7 @@
q = "a.attnum > 0"
if with_oid:
q = "(%s OR a.attname = 'oid')" % q
- q = ("SELECT a.attname, t.oid, t.typname, t.typname::regtype,"
- " t.typtype, t.typcategory, t.typdelim, t.typrelid"
- " FROM pg_attribute a"
- " JOIN pg_type t ON t.oid = a.atttypid"
- " WHERE a.attrelid = %s::regclass AND %s"
- " AND NOT a.attisdropped ORDER BY a.attnum") % (
- _quote_if_unqualified('$1', table), q)
+ q = self._query_attnames % (_quote_if_unqualified('$1', table), q)
names = self.db.query(q, (table,)).getresult()
types = self.dbtypes
names = ((name[0], types.add(*name[1:])) for name in names)
Modified: trunk/pgdb.py
==============================================================================
--- trunk/pgdb.py Sat Jul 2 04:46:03 2016 (r875)
+++ trunk/pgdb.py Thu Jul 14 08:21:19 2016 (r876)
@@ -631,6 +631,15 @@
self._typecasts = LocalTypecasts()
self._typecasts.get_fields = self.get_fields
self._typecasts.connection = cnx
+ if cnx.server_version < 80400:
+ # older remote databases (not officially supported)
+ self._query_pg_type = ("SELECT oid, typname,"
+ " typlen, typtype, null as typcategory, typdelim, typrelid"
+ " FROM pg_type WHERE oid=%s")
+ else:
+ self._query_pg_type = ("SELECT oid, typname,"
+ " typlen, typtype, typcategory, typdelim, typrelid"
+ " FROM pg_type WHERE oid=%s")
def __missing__(self, key):
"""Get the type info from the database if it is not cached."""
@@ -638,18 +647,16 @@
oid = key
else:
if '.' not in key and '"' not in key:
- key = '"%s"' % key
- oid = "'%s'::regtype" % self._escape_string(key)
+ key = '"%s"' % (key,)
+ oid = "'%s'::regtype" % (self._escape_string(key),)
try:
- self._src.execute("SELECT oid, typname,"
- " typlen, typtype, typcategory, typdelim, typrelid"
- " FROM pg_type WHERE oid=%s" % oid)
+ self._src.execute(self._query_pg_type % (oid,))
except ProgrammingError:
res = None
else:
res = self._src.fetch(1)
if not res:
- raise KeyError('Type %s could not be found' % key)
+ raise KeyError('Type %s could not be found' % (key,))
res = res[0]
type_code = TypeCode.create(int(res[0]), res[1],
int(res[2]), res[3], res[4], res[5], int(res[6]))
@@ -673,7 +680,7 @@
return None # this type is not composite
self._src.execute("SELECT attname, atttypid"
" FROM pg_attribute WHERE attrelid=%s AND attnum>0"
- " AND NOT attisdropped ORDER BY attnum" % typ.relid)
+ " AND NOT attisdropped ORDER BY attnum" % (typ.relid,))
return [FieldInfo(name, self.get(int(oid)))
for name, oid in self._src.fetch(-1)]
@@ -772,7 +779,7 @@
value = value.decode('ascii')
else:
value = self._cnx.escape_string(value)
- return "'%s'" % value
+ return "'%s'" % (value,)
if isinstance(value, float):
if isinf(value):
return "'-Infinity'" if value < 0 else "'Infinity'"
@@ -783,18 +790,18 @@
return value
if isinstance(value, datetime):
if value.tzinfo:
- return "'%s'::timestamptz" % value
- return "'%s'::timestamp" % value
+ return "'%s'::timestamptz" % (value,)
+ return "'%s'::timestamp" % (value,)
if isinstance(value, date):
- return "'%s'::date" % value
+ return "'%s'::date" % (value,)
if isinstance(value, time):
if value.tzinfo:
- return "'%s'::timetz" % value
+ return "'%s'::timetz" % (value,)
return "'%s'::time" % value
if isinstance(value, timedelta):
- return "'%s'::interval" % value
+ return "'%s'::interval" % (value,)
if isinstance(value, Uuid):
- return "'%s'::uuid" % value
+ return "'%s'::uuid" % (value,)
if isinstance(value, list):
# Quote value as an ARRAY constructor. This is better than using
# an array literal because it carries the information that this is
@@ -805,9 +812,9 @@
return "'{}'"
q = self._quote
try:
- return 'ARRAY[%s]' % ','.join(str(q(v)) for v in value)
+ return 'ARRAY[%s]' % (','.join(str(q(v)) for v in value),)
except UnicodeEncodeError: # Python 2 with non-ascii values
- return u'ARRAY[%s]' % ','.join(unicode(q(v)) for v in value)
+ return u'ARRAY[%s]' % (','.join(unicode(q(v)) for v in value),)
if isinstance(value, tuple):
# Quote as a ROW constructor. This is better than using a record
# literal because it carries the information that this is a record
@@ -816,14 +823,14 @@
# when the records has a single column which is not really useful.
q = self._quote
try:
- return '(%s)' % ','.join(str(q(v)) for v in value)
+ return '(%s)' % (','.join(str(q(v)) for v in value),)
except UnicodeEncodeError: # Python 2 with non-ascii values
- return u'(%s)' % ','.join(unicode(q(v)) for v in value)
+ return u'(%s)' % (','.join(unicode(q(v)) for v in value),)
try:
value = value.__pg_repr__()
except AttributeError:
raise InterfaceError(
- 'Do not know how to adapt type %s' % type(value))
+ 'Do not know how to adapt type %s' % (type(value),))
if isinstance(value, (tuple, list)):
value = self._quote(value)
return value
@@ -1036,7 +1043,7 @@
if isinstance(stream, basestring):
if not isinstance(stream, input_type):
- raise ValueError("The input must be %s" % type_name)
+ raise ValueError("The input must be %s" % (type_name,))
if not binary_format:
if isinstance(stream, str):
if not stream.endswith('\n'):
@@ -1054,7 +1061,8 @@
for chunk in stream:
if not isinstance(chunk, input_type):
raise ValueError(
- "Input stream must consist of %s" % type_name)
+ "Input stream must consist of %s"
+ % (type_name,))
if isinstance(chunk, str):
if not chunk.endswith('\n'):
chunk += '\n'
@@ -1121,7 +1129,7 @@
operation.append('(%s)' % (columns,))
operation.append("from stdin")
if options:
- operation.append('(%s)' % ','.join(options))
+ operation.append('(%s)' % (','.join(options),))
operation = ' '.join(operation)
putdata = self._src.putdata
@@ -1216,7 +1224,7 @@
operation.append("to stdout")
if options:
- operation.append('(%s)' % ','.join(options))
+ operation.append('(%s)' % (','.join(options),))
operation = ' '.join(operation)
getdata = self._src.getdata
@@ -1304,11 +1312,11 @@
try:
return namedtuple('Row', colnames, rename=True)._make
except TypeError: # Python 2.6 and 3.0 do not support rename
- colnames = [v if v.isalnum() else 'column_%d' % n
+ colnames = [v if v.isalnum() else 'column_%d' % (n,)
for n, v in enumerate(colnames)]
return namedtuple('Row', colnames)._make
except ValueError: # there is still a problem with the field names
- colnames = ['column_%d' % n for n in range(len(colnames))]
+ colnames = ['column_%d' % (n,) for n in range(len(colnames))]
return namedtuple('Row', colnames)._make
@@ -1493,8 +1501,8 @@
for kw, value in kwargs:
value = str(value)
if not value or ' ' in value:
- value = "'%s'" % value.replace(
- "'", "\\'").replace('\\', '\\\\')
+ value = "'%s'" % (value.replace(
+ "'", "\\'").replace('\\', '\\\\'),)
dbname.append('%s=%s' % (kw, value))
dbname = ' '.join(dbname)
@@ -1669,7 +1677,7 @@
quote = cls._re_quote.search(s)
s = cls._re_escape.sub(r'\\\1', s)
if quote:
- s = '"%s"' % s
+ s = '"%s"' % (s,)
return s
def __str__(self):
Modified: trunk/tests/test_classic.py
==============================================================================
--- trunk/tests/test_classic.py Sat Jul 2 04:46:03 2016 (r875)
+++ trunk/tests/test_classic.py Thu Jul 14 08:21:19 2016 (r876)
@@ -35,8 +35,8 @@
db.query("SET DATESTYLE TO 'ISO'")
db.query("SET TIME ZONE 'EST5EDT'")
db.query("SET DEFAULT_WITH_OIDS=FALSE")
- db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
db.query("SET CLIENT_MIN_MESSAGES=WARNING")
+ db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
return db
db = opendb()
Modified: trunk/tests/test_classic_connection.py
==============================================================================
--- trunk/tests/test_classic_connection.py Sat Jul 2 04:46:03 2016
(r875)
+++ trunk/tests/test_classic_connection.py Thu Jul 14 08:21:19 2016
(r876)
@@ -1818,10 +1818,16 @@
@classmethod
def setUpClass(cls):
- query = connect().query
+ db = connect()
+ query = db.query
query('set client_encoding=sql_ascii')
query('set standard_conforming_strings=off')
- query('set bytea_output=escape')
+ try:
+ query('set bytea_output=escape')
+ except pg.ProgrammingError:
+ if db.server_version >= 90000:
+ raise # ignore for older server versions
+ db.close()
cls.cls_set_up = True
def testEscapeString(self):
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Sat Jul 2 04:46:03 2016
(r875)
+++ trunk/tests/test_classic_dbwrapper.py Thu Jul 14 08:21:19 2016
(r876)
@@ -417,10 +417,14 @@
self.db.use_regtypes(self.regtypes)
query = self.db.query
query('set client_encoding=utf8')
- query('set standard_conforming_strings=on')
query("set lc_monetary='C'")
query("set datestyle='ISO,YMD'")
- query('set bytea_output=hex')
+ query('set standard_conforming_strings=on')
+ try:
+ query('set bytea_output=hex')
+ except pg.ProgrammingError:
+ if self.db.server_version >= 90000:
+ raise # ignore for older server versions
def tearDown(self):
self.doCleanups()
@@ -721,6 +725,7 @@
f(set(['default_with_oids', 'standard_conforming_strings']))
self.assertEqual(g('default_with_oids'), dwi)
self.assertEqual(g('standard_conforming_strings'), scs)
+ db.close()
def testResetParameterAll(self):
db = DB()
@@ -741,6 +746,7 @@
f('all')
self.assertEqual(g('default_with_oids'), dwi)
self.assertEqual(g('standard_conforming_strings'), scs)
+ db.close()
def testSetParameterLocal(self):
f = self.db.set_parameter
@@ -781,6 +787,7 @@
self.assertEqual(r, default_datestyle)
r = self.db.get_parameter('datestyle')
self.assertEqual(r, default_datestyle)
+ db.close()
def testReopen(self):
db = DB()
@@ -799,6 +806,7 @@
self.assertRaises(TypeError, getattr, con, 'query')
r = self.db.get_parameter('datestyle')
self.assertEqual(r, default_datestyle)
+ db.close()
def testCreateTable(self):
table = 'test hello world'
@@ -3954,7 +3962,9 @@
cls.set_option('bytea_escaped', not_bytea_escaped)
cls.set_option('namedresult', None)
cls.set_option('jsondecode', None)
- cls.regtypes = not DB().use_regtypes()
+ db = DB()
+ cls.regtypes = not db.use_regtypes()
+ db.close()
super(TestDBClassNonStdOpts, cls).setUpClass()
@classmethod
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql