Author: cito
Date: Thu Nov 19 13:49:11 2015
New Revision: 549
Log:
Add tests for large object support.
Also fix some minor issues in the C code and docs.
Particularly, type checks in large object support
should always be done before checking the object status.
Added:
branches/4.x/module/TEST_PyGreSQL_classic_largeobj.py
- copied, changed from r548,
branches/4.x/module/TEST_PyGreSQL_classic_connection.py
Modified:
branches/4.x/docs/classic.rst
branches/4.x/module/TEST_PyGreSQL_classic_functions.py
branches/4.x/module/pgmodule.c
Modified: branches/4.x/docs/classic.rst
==============================================================================
--- branches/4.x/docs/classic.rst Thu Nov 19 10:39:13 2015 (r548)
+++ branches/4.x/docs/classic.rst Thu Nov 19 13:49:11 2015 (r549)
@@ -1484,13 +1484,13 @@
:integer: new position in object
Exceptions raised:
- :TypeError: binvalid connection or invalid object,
+ :TypeError: invalid connection or invalid object,
bad parameter type, or too many parameters
:IOError: object is not opened, or seek error
Description:
This method allows to move the position cursor in the large object. The
- whence parameter can be obtained by OR-ing the constants defined in the
+ valid values for the whence parameter are defined as constants in the
`pg` module (`SEEK_SET`, `SEEK_CUR`, `SEEK_END`).
Syntax::
Modified: branches/4.x/module/TEST_PyGreSQL_classic_functions.py
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic_functions.py Thu Nov 19
10:39:13 2015 (r548)
+++ branches/4.x/module/TEST_PyGreSQL_classic_functions.py Thu Nov 19
13:49:11 2015 (r549)
@@ -317,15 +317,6 @@
self.assertTrue(1 <= len(w) <= 2)
self.assertTrue(w.isdigit())
- def testLargeObjectIntConstants(self):
- names = 'INV_READ INV_WRITE SEEK_SET SEEK_CUR SEEK_END'.split()
- for name in names:
- try:
- value = getattr(pg, name)
- except AttributeError:
- self.fail('Module constant %s is missing' % name)
- self.assertIsInstance(value, int)
-
if __name__ == '__main__':
unittest.main()
Copied and modified: branches/4.x/module/TEST_PyGreSQL_classic_largeobj.py
(from r548, branches/4.x/module/TEST_PyGreSQL_classic_connection.py)
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic_connection.py Thu Nov 19
10:39:13 2015 (r548, copy source)
+++ branches/4.x/module/TEST_PyGreSQL_classic_largeobj.py Thu Nov 19
13:49:11 2015 (r549)
@@ -3,7 +3,7 @@
"""Test the classic PyGreSQL interface.
-Sub-tests for the low-level connection object.
+Sub-tests for large object support.
Contributed by Christoph Zwerschke.
@@ -15,20 +15,10 @@
import unittest2 as unittest # for Python < 2.6
except ImportError:
import unittest
-import sys
-import threading
-import time
+import tempfile
import pg # the module under test
-from decimal import Decimal
-try:
- from collections import namedtuple
-except ImportError: # Python < 2.6
- namedtuple = None
-
-from StringIO import StringIO
-
# We need a database to test against. If LOCAL_PyGreSQL.py exists we will
# get our information from that. Otherwise we use the defaults.
dbname = 'unittest'
@@ -48,945 +38,332 @@
return connection
-class TestCanConnect(unittest.TestCase):
- """Test whether a basic connection to PostgreSQL is possible."""
-
- def testCanConnect(self):
- try:
- connection = connect()
- except pg.Error, error:
- self.fail('Cannot connect to database %s:\n%s' % (dbname, error))
- try:
- connection.close()
- except pg.Error:
- self.fail('Cannot close the database connection')
-
-
-class TestConnectObject(unittest.TestCase):
- """"Test existence of basic pg connection methods."""
-
- def setUp(self):
- self.connection = connect()
-
- def tearDown(self):
- try:
- self.connection.close()
- except pg.InternalError:
- pass
-
- def testAllConnectAttributes(self):
- attributes = '''db error host options port
- protocol_version server_version status tty user'''.split()
- connection_attributes = [a for a in dir(self.connection)
- if not callable(eval("self.connection." + a))]
- self.assertEqual(attributes, connection_attributes)
-
- def testAllConnectMethods(self):
- methods = '''cancel close endcopy
- escape_bytea escape_identifier escape_literal escape_string
- fileno get_notice_receiver getline getlo getnotify
- inserttable locreate loimport parameter putline query reset
- set_notice_receiver source transaction'''.split()
- connection_methods = [a for a in dir(self.connection)
- if callable(eval("self.connection." + a))]
- self.assertEqual(methods, connection_methods)
-
- def testAttributeDb(self):
- self.assertEqual(self.connection.db, dbname)
-
- def testAttributeError(self):
- error = self.connection.error
- self.assertTrue(not error or 'krb5_' in error)
-
- def testAttributeHost(self):
- def_host = 'localhost'
- self.assertIsInstance(self.connection.host, str)
- self.assertEqual(self.connection.host, dbhost or def_host)
-
- def testAttributeOptions(self):
- no_options = ''
- self.assertEqual(self.connection.options, no_options)
-
- def testAttributePort(self):
- def_port = 5432
- self.assertIsInstance(self.connection.port, int)
- self.assertEqual(self.connection.port, dbport or def_port)
-
- def testAttributeProtocolVersion(self):
- protocol_version = self.connection.protocol_version
- self.assertIsInstance(protocol_version, int)
- self.assertTrue(2 <= protocol_version < 4)
-
- def testAttributeServerVersion(self):
- server_version = self.connection.server_version
- self.assertIsInstance(server_version, int)
- self.assertTrue(70400 <= server_version < 100000)
-
- def testAttributeStatus(self):
- status_ok = 1
- self.assertIsInstance(self.connection.status, int)
- self.assertEqual(self.connection.status, status_ok)
-
- def testAttributeTty(self):
- def_tty = ''
- self.assertIsInstance(self.connection.tty, str)
- self.assertEqual(self.connection.tty, def_tty)
-
- def testAttributeUser(self):
- no_user = 'Deprecated facility'
- user = self.connection.user
- self.assertTrue(user)
- self.assertIsInstance(user, str)
- self.assertNotEqual(user, no_user)
-
- def testMethodQuery(self):
- query = self.connection.query
- query("select 1+1")
- query("select 1+$1", (1,))
- query("select 1+$1+$2", (2, 3))
- query("select 1+$1+$2", [2, 3])
-
- def testMethodQueryEmpty(self):
- self.assertRaises(ValueError, self.connection.query, '')
+class TestModuleConstants(unittest.TestCase):
+ """Test the existence of the documented module constants."""
- def testMethodEndcopy(self):
- try:
- self.connection.endcopy()
- except IOError:
- pass
-
- def testMethodClose(self):
- self.connection.close()
- try:
- self.connection.reset()
- except (pg.Error, TypeError):
- pass
- else:
- self.fail('Reset should give an error for a closed connection')
- self.assertRaises(pg.InternalError, self.connection.close)
- try:
- self.connection.query('select 1')
- except (pg.Error, TypeError):
- pass
- else:
- self.fail('Query should give an error for a closed connection')
- self.connection = connect()
-
- def testMethodReset(self):
- query = self.connection.query
- # check that client encoding gets reset
- encoding = query('show client_encoding').getresult()[0][0].upper()
- changed_encoding = 'LATIN1' if encoding == 'UTF8' else 'UTF8'
- self.assertNotEqual(encoding, changed_encoding)
- self.connection.query("set client_encoding=%s" % changed_encoding)
- new_encoding = query('show client_encoding').getresult()[0][0].upper()
- self.assertEqual(new_encoding, changed_encoding)
- self.connection.reset()
- new_encoding = query('show client_encoding').getresult()[0][0].upper()
- self.assertNotEqual(new_encoding, changed_encoding)
- self.assertEqual(new_encoding, encoding)
-
- def testMethodCancel(self):
- r = self.connection.cancel()
- self.assertIsInstance(r, int)
- self.assertEqual(r, 1)
-
- def testCancelLongRunningThread(self):
- errors = []
-
- def sleep():
+ def testLargeObjectIntConstants(self):
+ names = 'INV_READ INV_WRITE SEEK_SET SEEK_CUR SEEK_END'.split()
+ for name in names:
try:
- self.connection.query('select pg_sleep(5)').getresult()
- except pg.ProgrammingError, error:
- errors.append(str(error))
-
- thread = threading.Thread(target=sleep)
- t1 = time.time()
- thread.start() # run the query
- while 1: # make sure the query is really running
- time.sleep(0.1)
- if thread.is_alive() or time.time() - t1 > 5:
- break
- r = self.connection.cancel() # cancel the running query
- thread.join() # wait for the thread to end
- t2 = time.time()
-
- self.assertIsInstance(r, int)
- self.assertEqual(r, 1) # return code should be 1
- self.assertLessEqual(t2 - t1, 3) # time should be under 3 seconds
- self.assertTrue(errors)
-
- def testMethodFileNo(self):
- r = self.connection.fileno()
- self.assertIsInstance(r, int)
- self.assertGreaterEqual(r, 0)
+ value = getattr(pg, name)
+ except AttributeError:
+ self.fail('Module constant %s is missing' % name)
+ self.assertIsInstance(value, int)
-class TestSimpleQueries(unittest.TestCase):
- """"Test simple queries via a basic pg connection."""
+class TestCreatingLargeObjects(unittest.TestCase):
+ """Test creating large objects using a connection."""
def setUp(self):
self.c = connect()
+ self.c.query('begin')
def tearDown(self):
+ self.c.query('end')
self.c.close()
- def testSelect0(self):
- q = "select 0"
- self.c.query(q)
-
- def testSelect0Semicolon(self):
- q = "select 0;"
- self.c.query(q)
-
- def testSelectDotSemicolon(self):
- q = "select .;"
- self.assertRaises(pg.ProgrammingError, self.c.query, q)
-
- def testGetresult(self):
- q = "select 0"
- result = [(0,)]
- r = self.c.query(q).getresult()
- self.assertIsInstance(r, list)
- v = r[0]
- self.assertIsInstance(v, tuple)
- self.assertIsInstance(v[0], int)
- self.assertEqual(r, result)
-
- def testGetresultLong(self):
- q = "select 1234567890123456790"
- result = 1234567890123456790L
- v = self.c.query(q).getresult()[0][0]
- self.assertIsInstance(v, long)
- self.assertEqual(v, result)
-
- def testGetresultString(self):
- result = 'Hello, world!'
- q = "select '%s'" % result
- v = self.c.query(q).getresult()[0][0]
- self.assertIsInstance(v, str)
- self.assertEqual(v, result)
-
- def testDictresult(self):
- q = "select 0 as alias0"
- result = [{'alias0': 0}]
- r = self.c.query(q).dictresult()
- self.assertIsInstance(r, list)
- v = r[0]
- self.assertIsInstance(v, dict)
- self.assertIsInstance(v['alias0'], int)
- self.assertEqual(r, result)
-
- def testDictresultLong(self):
- q = "select 1234567890123456790 as longjohnsilver"
- result = 1234567890123456790L
- v = self.c.query(q).dictresult()[0]['longjohnsilver']
- self.assertIsInstance(v, long)
- self.assertEqual(v, result)
-
- def testDictresultString(self):
- result = 'Hello, world!'
- q = "select '%s' as greeting" % result
- v = self.c.query(q).dictresult()[0]['greeting']
- self.assertIsInstance(v, str)
- self.assertEqual(v, result)
-
- @unittest.skipUnless(namedtuple, 'Named tuples not available')
- def testNamedresult(self):
- q = "select 0 as alias0"
- result = [(0,)]
- r = self.c.query(q).namedresult()
- self.assertEqual(r, result)
- v = r[0]
- self.assertEqual(v._fields, ('alias0',))
- self.assertEqual(v.alias0, 0)
-
- def testGet3Cols(self):
- q = "select 1,2,3"
- result = [(1, 2, 3)]
- r = self.c.query(q).getresult()
- self.assertEqual(r, result)
-
- def testGet3DictCols(self):
- q = "select 1 as a,2 as b,3 as c"
- result = [dict(a=1, b=2, c=3)]
- r = self.c.query(q).dictresult()
- self.assertEqual(r, result)
-
- @unittest.skipUnless(namedtuple, 'Named tuples not available')
- def testGet3NamedCols(self):
- q = "select 1 as a,2 as b,3 as c"
- result = [(1, 2, 3)]
- r = self.c.query(q).namedresult()
- self.assertEqual(r, result)
- v = r[0]
- self.assertEqual(v._fields, ('a', 'b', 'c'))
- self.assertEqual(v.b, 2)
-
- def testGet3Rows(self):
- q = "select 3 union select 1 union select 2 order by 1"
- result = [(1,), (2,), (3,)]
- r = self.c.query(q).getresult()
- self.assertEqual(r, result)
-
- def testGet3DictRows(self):
- q = ("select 3 as alias3"
- " union select 1 union select 2 order by 1")
- result = [{'alias3': 1}, {'alias3': 2}, {'alias3': 3}]
- r = self.c.query(q).dictresult()
- self.assertEqual(r, result)
-
- @unittest.skipUnless(namedtuple, 'Named tuples not available')
- def testGet3NamedRows(self):
- q = ("select 3 as alias3"
- " union select 1 union select 2 order by 1")
- result = [(1,), (2,), (3,)]
- r = self.c.query(q).namedresult()
- self.assertEqual(r, result)
- for v in r:
- self.assertEqual(v._fields, ('alias3',))
-
- def testDictresultNames(self):
- q = "select 'MixedCase' as MixedCaseAlias"
- result = [{'mixedcasealias': 'MixedCase'}]
- r = self.c.query(q).dictresult()
- self.assertEqual(r, result)
- q = "select 'MixedCase' as \"MixedCaseAlias\""
- result = [{'MixedCaseAlias': 'MixedCase'}]
- r = self.c.query(q).dictresult()
- self.assertEqual(r, result)
-
- @unittest.skipUnless(namedtuple, 'Named tuples not available')
- def testNamedresultNames(self):
- q = "select 'MixedCase' as MixedCaseAlias"
- result = [('MixedCase',)]
- r = self.c.query(q).namedresult()
- self.assertEqual(r, result)
- v = r[0]
- self.assertEqual(v._fields, ('mixedcasealias',))
- self.assertEqual(v.mixedcasealias, 'MixedCase')
- q = "select 'MixedCase' as \"MixedCaseAlias\""
- r = self.c.query(q).namedresult()
- self.assertEqual(r, result)
- v = r[0]
- self.assertEqual(v._fields, ('MixedCaseAlias',))
- self.assertEqual(v.MixedCaseAlias, 'MixedCase')
-
- def testBigGetresult(self):
- num_cols = 100
- num_rows = 100
- q = "select " + ','.join(map(str, xrange(num_cols)))
- q = ' union all '.join((q,) * num_rows)
- r = self.c.query(q).getresult()
- result = [tuple(range(num_cols))] * num_rows
- self.assertEqual(r, result)
-
- def testListfields(self):
- q = ('select 0 as a, 0 as b, 0 as c,'
- ' 0 as c, 0 as b, 0 as a,'
- ' 0 as lowercase, 0 as UPPERCASE,'
- ' 0 as MixedCase, 0 as "MixedCase",'
- ' 0 as a_long_name_with_underscores,'
- ' 0 as "A long name with Blanks"')
- r = self.c.query(q).listfields()
- result = ('a', 'b', 'c', 'c', 'b', 'a',
- 'lowercase', 'uppercase', 'mixedcase', 'MixedCase',
- 'a_long_name_with_underscores',
- 'A long name with Blanks')
- self.assertEqual(r, result)
-
- def testFieldname(self):
- q = "select 0 as z, 0 as a, 0 as x, 0 as y"
- r = self.c.query(q).fieldname(2)
- self.assertEqual(r, 'x')
- r = self.c.query(q).fieldname(3)
- self.assertEqual(r, 'y')
-
- def testFieldnum(self):
- q = "select 1 as x"
- self.assertRaises(ValueError, self.c.query(q).fieldnum, 'y')
- q = "select 1 as x"
- r = self.c.query(q).fieldnum('x')
- self.assertIsInstance(r, int)
- self.assertEqual(r, 0)
- q = "select 0 as z, 0 as a, 0 as x, 0 as y"
- r = self.c.query(q).fieldnum('x')
- self.assertIsInstance(r, int)
- self.assertEqual(r, 2)
- r = self.c.query(q).fieldnum('y')
- self.assertIsInstance(r, int)
- self.assertEqual(r, 3)
-
- def testNtuples(self):
- q = "select 1 where false"
- r = self.c.query(q).ntuples()
- self.assertIsInstance(r, int)
- self.assertEqual(r, 0)
- q = ("select 1 as a, 2 as b, 3 as c, 4 as d"
- " union select 5 as a, 6 as b, 7 as c, 8 as d")
- r = self.c.query(q).ntuples()
- self.assertIsInstance(r, int)
- self.assertEqual(r, 2)
- q = ("select 1 union select 2 union select 3"
- " union select 4 union select 5 union select 6")
- r = self.c.query(q).ntuples()
- self.assertIsInstance(r, int)
- self.assertEqual(r, 6)
+ def assertIsLargeObject(self, obj):
+ self.assertIsNotNone(obj)
+ self.assertTrue(hasattr(obj, 'open'))
+ self.assertTrue(hasattr(obj, 'close'))
+ self.assertTrue(hasattr(obj, 'oid'))
+ self.assertTrue(hasattr(obj, 'pgcnx'))
+ self.assertTrue(hasattr(obj, 'error'))
+ self.assertIsInstance(obj.oid, int)
+ self.assertNotEqual(obj.oid, 0)
+ self.assertIs(obj.pgcnx, self.c)
+ self.assertIsInstance(obj.error, str)
+ self.assertFalse(obj.error)
- def testQuery(self):
- query = self.c.query
- query("drop table if exists test_table")
- q = "create table test_table (n integer) with oids"
- r = query(q)
- self.assertIsNone(r)
- q = "insert into test_table values (1)"
- r = query(q)
- self.assertIsInstance(r, int)
- q = "insert into test_table select 2"
- r = query(q)
- self.assertIsInstance(r, int)
- oid = r
- q = "select oid from test_table where n=2"
- r = query(q).getresult()
- self.assertEqual(len(r), 1)
- r = r[0]
- self.assertEqual(len(r), 1)
- r = r[0]
- self.assertIsInstance(r, int)
- self.assertEqual(r, oid)
- q = "insert into test_table select 3 union select 4 union select 5"
- r = query(q)
- self.assertIsInstance(r, str)
- self.assertEqual(r, '3')
- q = "update test_table set n=4 where n<5"
- r = query(q)
- self.assertIsInstance(r, str)
- self.assertEqual(r, '4')
- q = "delete from test_table"
- r = query(q)
- self.assertIsInstance(r, str)
- self.assertEqual(r, '5')
- query("drop table test_table")
-
- def testPrint(self):
- q = ("select 1 as a, 'hello' as h, 'w' as world"
- " union select 2, 'xyz', 'uvw'")
- r = self.c.query(q)
- s = StringIO()
- stdout, sys.stdout = sys.stdout, s
+ def testLoCreate(self):
+ large_object = self.c.locreate(pg.INV_READ | pg.INV_WRITE)
try:
- print r
- except Exception:
- pass
+ self.assertIsLargeObject(large_object)
finally:
- sys.stdout = stdout
- r = s.getvalue()
- s.close()
- self.assertEqual(r,
- 'a| h |world\n'
- '-+-----+-----\n'
- '1|hello|w \n'
- '2|xyz |uvw \n'
- '(2 rows)\n')
-
-
-class TestParamQueries(unittest.TestCase):
- """"Test queries with parameters via a basic pg connection."""
-
- def setUp(self):
- self.c = connect()
-
- def tearDown(self):
- self.c.close()
-
- def testQueryWithNoneParam(self):
- self.assertEqual(self.c.query("select $1::integer", (None,)
- ).getresult(), [(None,)])
- self.assertEqual(self.c.query("select $1::text", [None]
- ).getresult(), [(None,)])
-
- def testQueryWithBoolParams(self):
- query = self.c.query
- self.assertEqual(query("select false").getresult(), [('f',)])
- self.assertEqual(query("select true").getresult(), [('t',)])
- self.assertEqual(query("select $1::bool", (None,)).getresult(),
- [(None,)])
- self.assertEqual(query("select $1::bool", ('f',)).getresult(),
[('f',)])
- self.assertEqual(query("select $1::bool", ('t',)).getresult(),
[('t',)])
- self.assertEqual(query("select $1::bool", ('false',)).getresult(),
- [('f',)])
- self.assertEqual(query("select $1::bool", ('true',)).getresult(),
- [('t',)])
- self.assertEqual(query("select $1::bool", ('n',)).getresult(),
[('f',)])
- self.assertEqual(query("select $1::bool", ('y',)).getresult(),
[('t',)])
- self.assertEqual(query("select $1::bool", (0,)).getresult(), [('f',)])
- self.assertEqual(query("select $1::bool", (1,)).getresult(), [('t',)])
- self.assertEqual(query("select $1::bool", (False,)).getresult(),
- [('f',)])
- self.assertEqual(query("select $1::bool", (True,)).getresult(),
- [('t',)])
-
- def testQueryWithIntParams(self):
- query = self.c.query
- self.assertEqual(query("select 1+1").getresult(), [(2,)])
- self.assertEqual(query("select 1+$1", (1,)).getresult(), [(2,)])
- self.assertEqual(query("select 1+$1", [1]).getresult(), [(2,)])
- self.assertEqual(query("select $1::integer", (2,)).getresult(), [(2,)])
- self.assertEqual(query("select $1::text", (2,)).getresult(), [('2',)])
- self.assertEqual(query("select 1+$1::numeric", [1]).getresult(),
- [(Decimal('2'),)])
- self.assertEqual(query("select 1, $1::integer", (2,)
- ).getresult(), [(1, 2)])
- self.assertEqual(query("select 1 union select $1", (2,)
- ).getresult(), [(1,), (2,)])
- self.assertEqual(query("select $1::integer+$2", (1, 2)
- ).getresult(), [(3,)])
- self.assertEqual(query("select $1::integer+$2", [1, 2]
- ).getresult(), [(3,)])
- self.assertEqual(query("select 0+$1+$2+$3+$4+$5+$6", range(6)
- ).getresult(), [(15,)])
-
- def testQueryWithStrParams(self):
- query = self.c.query
- self.assertEqual(query("select $1||', world!'", ('Hello',)
- ).getresult(), [('Hello, world!',)])
- self.assertEqual(query("select $1||', world!'", ['Hello']
- ).getresult(), [('Hello, world!',)])
- self.assertEqual(query("select $1||', '||$2||'!'", ('Hello', 'world'),
- ).getresult(), [('Hello, world!',)])
- self.assertEqual(query("select $1::text", ('Hello, world!',)
- ).getresult(), [('Hello, world!',)])
- self.assertEqual(query("select $1::text,$2::text", ('Hello', 'world')
- ).getresult(), [('Hello', 'world')])
- self.assertEqual(query("select $1::text,$2::text", ['Hello', 'world']
- ).getresult(), [('Hello', 'world')])
- self.assertEqual(query("select $1::text union select $2::text",
- ('Hello', 'world')).getresult(), [('Hello',), ('world',)])
- self.assertEqual(query("select $1||', '||$2||'!'", ('Hello',
- 'w\xc3\xb6rld')).getresult(), [('Hello, w\xc3\xb6rld!',)])
-
- def testQueryWithUnicodeParams(self):
- query = self.c.query
- query('set client_encoding = utf8')
- self.assertEqual(query("select $1||', '||$2||'!'",
- ('Hello', u'w\xf6rld')).getresult(), [('Hello, w\xc3\xb6rld!',)])
- self.assertEqual(query("select $1||', '||$2||'!'",
- ('Hello', u'\u043c\u0438\u0440')).getresult(),
- [('Hello, \xd0\xbc\xd0\xb8\xd1\x80!',)])
- query('set client_encoding = latin1')
- self.assertEqual(query("select $1||', '||$2||'!'",
- ('Hello', u'w\xf6rld')).getresult(), [('Hello, w\xf6rld!',)])
- self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
- ('Hello', u'\u043c\u0438\u0440'))
- query('set client_encoding = iso_8859_1')
- self.assertEqual(query("select $1||', '||$2||'!'",
- ('Hello', u'w\xf6rld')).getresult(), [('Hello, w\xf6rld!',)])
- self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
- ('Hello', u'\u043c\u0438\u0440'))
- query('set client_encoding = iso_8859_5')
- self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
- ('Hello', u'w\xf6rld'))
- self.assertEqual(query("select $1||', '||$2||'!'",
- ('Hello', u'\u043c\u0438\u0440')).getresult(),
- [('Hello, \xdc\xd8\xe0!',)])
- query('set client_encoding = sql_ascii')
- self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
- ('Hello', u'w\xf6rld'))
-
- def testQueryWithMixedParams(self):
- self.assertEqual(self.c.query("select $1+2,$2||', world!'",
- (1, 'Hello'),).getresult(), [(3, 'Hello, world!')])
- self.assertEqual(self.c.query("select $1::integer,$2::date,$3::text",
- (4711, None, 'Hello!'),).getresult(), [(4711, None, 'Hello!')])
-
- def testQueryWithDuplicateParams(self):
- self.assertRaises(pg.ProgrammingError,
- self.c.query, "select $1+$1", (1,))
- self.assertRaises(pg.ProgrammingError,
- self.c.query, "select $1+$1", (1, 2))
-
- def testQueryWithZeroParams(self):
- self.assertEqual(self.c.query("select 1+1", []
- ).getresult(), [(2,)])
-
- def testQueryWithGarbage(self):
- garbage = r"'\{}+()-#[]oo324"
- self.assertEqual(self.c.query("select $1::text AS garbage", (garbage,)
- ).dictresult(), [{'garbage': garbage}])
-
- def testUnicodeQuery(self):
- query = self.c.query
- self.assertEqual(query(u"select 1+1").getresult(), [(2,)])
- self.assertRaises(TypeError, query, u"select 'Hello, w\xf6rld!'")
-
-
-class TestInserttable(unittest.TestCase):
- """"Test inserttable method."""
-
- @classmethod
- def setUpClass(cls):
- c = connect()
- c.query("drop table if exists test cascade")
- c.query("create table test ("
- "i2 smallint, i4 integer, i8 bigint, b boolean, dt date, ti time,"
- "d numeric, f4 real, f8 double precision, m money,"
- "c char(1), v4 varchar(4), c4 char(4), t text)")
- c.close()
-
- @classmethod
- def tearDownClass(cls):
- c = connect()
- c.query("drop table test cascade")
- c.close()
-
- def setUp(self):
- self.c = connect()
- self.c.query("set datestyle='ISO,YMD'")
+ del large_object
- def tearDown(self):
- self.c.query("truncate table test")
- self.c.close()
-
- data = [
- (-1, -1, -1L, True, '1492-10-12', '08:30:00',
- -1.2345, -1.75, -1.875, '-1.25', '-', 'r?', '!u', 'xyz'),
- (0, 0, 0L, False, '1607-04-14', '09:00:00',
- 0.0, 0.0, 0.0, '0.0', ' ', '0123', '4567', '890'),
- (1, 1, 1L, True, '1801-03-04', '03:45:00',
- 1.23456, 1.75, 1.875, '1.25', 'x', 'bc', 'cdef', 'g'),
- (2, 2, 2L, False, '1903-12-17', '11:22:00',
- 2.345678, 2.25, 2.125, '2.75', 'y', 'q', 'ijk', 'mnop\nstux!')]
-
- def get_back(self):
- """Convert boolean and decimal values back."""
- data = []
- for row in self.c.query("select * from test order by 1").getresult():
- self.assertIsInstance(row, tuple)
- row = list(row)
- if row[0] is not None: # smallint
- self.assertIsInstance(row[0], int)
- if row[1] is not None: # integer
- self.assertIsInstance(row[1], int)
- if row[2] is not None: # bigint
- self.assertIsInstance(row[2], long)
- if row[3] is not None: # boolean
- self.assertIsInstance(row[3], str)
- row[3] = {'f': False, 't': True}.get(row[3])
- if row[4] is not None: # date
- self.assertIsInstance(row[4], str)
- self.assertTrue(row[4].replace('-', '').isdigit())
- if row[5] is not None: # time
- self.assertIsInstance(row[5], str)
- self.assertTrue(row[5].replace(':', '').isdigit())
- if row[6] is not None: # numeric
- self.assertIsInstance(row[6], Decimal)
- row[6] = float(row[6])
- if row[7] is not None: # real
- self.assertIsInstance(row[7], float)
- if row[8] is not None: # double precision
- self.assertIsInstance(row[8], float)
- row[8] = float(row[8])
- if row[9] is not None: # money
- self.assertIsInstance(row[9], Decimal)
- row[9] = str(float(row[9]))
- if row[10] is not None: # char(1)
- self.assertIsInstance(row[10], str)
- self.assertEqual(len(row[10]), 1)
- if row[11] is not None: # varchar(4)
- self.assertIsInstance(row[11], str)
- self.assertLessEqual(len(row[11]), 4)
- if row[12] is not None: # char(4)
- self.assertIsInstance(row[12], str)
- self.assertEqual(len(row[12]), 4)
- row[12] = row[12].rstrip()
- if row[13] is not None: # text
- self.assertIsInstance(row[13], str)
- row = tuple(row)
- data.append(row)
- return data
-
- def testInserttable1Row(self):
- data = self.data[2:3]
- self.c.inserttable("test", data)
- self.assertEqual(self.get_back(), data)
-
- def testInserttable4Rows(self):
- data = self.data
- self.c.inserttable("test", data)
- self.assertEqual(self.get_back(), data)
-
- def testInserttableMultipleRows(self):
- num_rows = 100
- data = self.data[2:3] * num_rows
- self.c.inserttable("test", data)
- r = self.c.query("select count(*) from test").getresult()[0][0]
- self.assertEqual(r, num_rows)
-
- def testInserttableMultipleCalls(self):
- num_rows = 10
- data = self.data[2:3]
- for _i in range(num_rows):
- self.c.inserttable("test", data)
- r = self.c.query("select count(*) from test").getresult()[0][0]
- self.assertEqual(r, num_rows)
-
- def testInserttableNullValues(self):
- data = [(None,) * 14] * 100
- self.c.inserttable("test", data)
- self.assertEqual(self.get_back(), data)
-
- def testInserttableMaxValues(self):
- data = [(2 ** 15 - 1, int(2 ** 31 - 1), long(2 ** 31 - 1),
- True, '2999-12-31', '11:59:59', 1e99,
- 1.0 + 1.0 / 32, 1.0 + 1.0 / 32, None,
- "1", "1234", "1234", "1234" * 100)]
- self.c.inserttable("test", data)
- self.assertEqual(self.get_back(), data)
-
-
-class TestDirectSocketAccess(unittest.TestCase):
- """"Test copy command with direct socket access."""
-
- @classmethod
- def setUpClass(cls):
- c = connect()
- c.query("drop table if exists test cascade")
- c.query("create table test (i int, v varchar(16))")
- c.close()
-
- @classmethod
- def tearDownClass(cls):
- c = connect()
- c.query("drop table test cascade")
- c.close()
-
- def setUp(self):
- self.c = connect()
- self.c.query("set datestyle='ISO,YMD'")
-
- def tearDown(self):
- self.c.query("truncate table test")
- self.c.close()
-
- def testPutline(self):
- putline = self.c.putline
- query = self.c.query
- data = list(enumerate("apple pear plum cherry banana".split()))
- query("copy test from stdin")
+ def testGetLo(self):
+ large_object = self.c.locreate(pg.INV_READ | pg.INV_WRITE)
try:
- for i, v in data:
- putline("%d\t%s\n" % (i, v))
- putline("\\.\n")
+ self.assertIsLargeObject(large_object)
+ oid = large_object.oid
finally:
- self.c.endcopy()
- r = query("select * from test").getresult()
- self.assertEqual(r, data)
-
- def testPutline(self):
- getline = self.c.getline
- query = self.c.query
- data = list(enumerate("apple banana pear plum strawberry".split()))
- n = len(data)
- self.c.inserttable('test', data)
- query("copy test to stdout")
- try:
- for i in range(n + 2):
- v = getline()
- if i < n:
- self.assertEqual(v, '%d\t%s' % data[i])
- elif i == n:
- self.assertEqual(v, '\\.')
- else:
- self.assertIsNone(v)
+ del large_object
+ data = 'some data to be shared'
+ large_object = self.c.getlo(oid)
+ try:
+ self.assertIsLargeObject(large_object)
+ self.assertEqual(large_object.oid, oid)
+ large_object.open(pg.INV_WRITE)
+ large_object.write(data)
+ large_object.close()
finally:
- try:
- self.c.endcopy()
- except IOError:
- pass
-
- def testParameterChecks(self):
- self.assertRaises(TypeError, self.c.putline)
- self.assertRaises(TypeError, self.c.getline, 'invalid')
- self.assertRaises(TypeError, self.c.endcopy, 'invalid')
-
-
-class TestNotificatons(unittest.TestCase):
- """"Test notification support."""
-
- def setUp(self):
- self.c = connect()
-
- def tearDown(self):
- self.c.close()
-
- def testGetNotify(self):
- getnotify = self.c.getnotify
- query = self.c.query
- self.assertIsNone(getnotify())
- query('listen test_notify')
+ del large_object
+ large_object = self.c.getlo(oid)
try:
- self.assertIsNone(self.c.getnotify())
- query("notify test_notify")
- r = getnotify()
- self.assertIsInstance(r, tuple)
- self.assertEqual(len(r), 3)
- self.assertIsInstance(r[0], str)
- self.assertIsInstance(r[1], int)
- self.assertIsInstance(r[2], str)
- self.assertEqual(r[0], 'test_notify')
- self.assertEqual(r[2], '')
- self.assertIsNone(self.c.getnotify())
- try:
- query("notify test_notify, 'test_payload'")
- except pg.ProgrammingError: # PostgreSQL < 9.0
- pass
- else:
- r = getnotify()
- self.assertTrue(isinstance(r, tuple))
- self.assertEqual(len(r), 3)
- self.assertIsInstance(r[0], str)
- self.assertIsInstance(r[1], int)
- self.assertIsInstance(r[2], str)
- self.assertEqual(r[0], 'test_notify')
- self.assertEqual(r[2], 'test_payload')
- self.assertIsNone(getnotify())
+ self.assertIsLargeObject(large_object)
+ self.assertEqual(large_object.oid, oid)
+ large_object.open(pg.INV_READ)
+ r = large_object.read(80)
+ large_object.close()
+ large_object.unlink()
finally:
- query('unlisten test_notify')
-
- def testGetNoticeReceiver(self):
- self.assertIsNone(self.c.get_notice_receiver())
-
- def testSetNoticeReceiver(self):
- self.assertRaises(TypeError, self.c.set_notice_receiver, None)
- self.assertRaises(TypeError, self.c.set_notice_receiver, 42)
- self.assertIsNone(self.c.set_notice_receiver(lambda notice: None))
-
- def testSetAndGetNoticeReceiver(self):
- r = lambda notice: None
- self.assertIsNone(self.c.set_notice_receiver(r))
- self.assertIs(self.c.get_notice_receiver(), r)
-
- def testNoticeReceiver(self):
- self.c.query('''create function bilbo_notice() returns void AS $$
- begin
- raise warning 'Bilbo was here!';
- end;
- $$ language plpgsql''')
- try:
- received = {}
+ del large_object
+ self.assertEqual(r, data)
- def notice_receiver(notice):
- for attr in dir(notice):
- value = getattr(notice, attr)
- if isinstance(value, str):
- value = value.replace('WARNUNG', 'WARNING')
- received[attr] = value
-
- self.c.set_notice_receiver(notice_receiver)
- self.c.query('''select bilbo_notice()''')
- self.assertEqual(received, dict(
- pgcnx=self.c, message='WARNING: Bilbo was here!\n',
- severity='WARNING', primary='Bilbo was here!',
- detail=None, hint=None))
+ def testLoImport(self):
+ f = tempfile.NamedTemporaryFile()
+ data = 'some data to be imported'
+ f.write(data)
+ f.flush()
+ f.seek(0)
+ large_object = self.c.loimport(f.name)
+ try:
+ f.close()
+ self.assertIsLargeObject(large_object)
+ large_object.open(pg.INV_READ)
+ large_object.seek(0, pg.SEEK_SET)
+ r = large_object.size()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, len(data))
+ r = large_object.read(80)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, data)
+ large_object.close()
+ large_object.unlink()
finally:
- self.c.query('''drop function bilbo_notice();''')
-
-
-class TestConfigFunctions(unittest.TestCase):
- """Test the functions for changing default settings.
+ del large_object
- To test the effect of most of these functions, we need a database
- connection. That's why they are covered in this test module.
- """
+class TestLargeObjects(unittest.TestCase):
+ """Test the large object methods."""
def setUp(self):
- self.c = connect()
+ self.pgcnx = connect()
+ self.pgcnx.query('begin')
+ self.obj = self.pgcnx.locreate(pg.INV_READ | pg.INV_WRITE)
def tearDown(self):
- self.c.close()
-
- def testGetDecimalPoint(self):
- point = pg.get_decimal_point()
- self.assertIsInstance(point, str)
- self.assertEqual(point, '.')
-
- def testSetDecimalPoint(self):
- d = pg.Decimal
- point = pg.get_decimal_point()
- query = self.c.query
- # check that money values can be interpreted correctly
- # if and only if the decimal point is set appropriately
- # for the current lc_monetary setting
- query("set lc_monetary='en_US'")
- pg.set_decimal_point('.')
- r = query("select '34.25'::money").getresult()[0][0]
- self.assertIsInstance(r, d)
- self.assertEqual(r, d('34.25'))
- pg.set_decimal_point(',')
- r = query("select '34.25'::money").getresult()[0][0]
- self.assertNotEqual(r, d('34.25'))
- query("set lc_monetary='de_DE'")
- pg.set_decimal_point(',')
- r = query("select '34,25'::money").getresult()[0][0]
- self.assertIsInstance(r, d)
- self.assertEqual(r, d('34.25'))
- pg.set_decimal_point('.')
- r = query("select '34,25'::money").getresult()[0][0]
- self.assertNotEqual(r, d('34.25'))
- pg.set_decimal_point(point)
-
- def testSetDecimal(self):
- d = pg.Decimal
- query = self.c.query
- r = query("select 3425::numeric").getresult()[0][0]
- self.assertIsInstance(r, d)
- self.assertEqual(r, d('3425'))
- pg.set_decimal(long)
- r = query("select 3425::numeric").getresult()[0][0]
- self.assertNotIsInstance(r, d)
- self.assertIsInstance(r, long)
- self.assertEqual(r, 3425L)
- pg.set_decimal(d)
-
- @unittest.skipUnless(namedtuple, 'Named tuples not available')
- def testSetNamedresult(self):
- query = self.c.query
-
- r = query("select 1 as x, 2 as y").namedresult()[0]
- self.assertIsInstance(r, tuple)
- self.assertEqual(r, (1, 2))
- self.assertIsNot(type(r), tuple)
- self.assertEqual(r._fields, ('x', 'y'))
- self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
- self.assertEqual(r.__class__.__name__, 'Row')
-
- _namedresult = pg._namedresult
- self.assertTrue(callable(_namedresult))
- pg.set_namedresult(_namedresult)
-
- r = query("select 1 as x, 2 as y").namedresult()[0]
- self.assertIsInstance(r, tuple)
- self.assertEqual(r, (1, 2))
- self.assertIsNot(type(r), tuple)
- self.assertEqual(r._fields, ('x', 'y'))
- self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
- self.assertEqual(r.__class__.__name__, 'Row')
+ if self.obj.oid:
+ try:
+ self.obj.close()
+ except IOError:
+ pass
+ try:
+ self.obj.unlink()
+ except IOError:
+ pass
+ del self.obj
+ self.pgcnx.query('end')
+ self.pgcnx.close()
+
+ def testOid(self):
+ self.assertIsInstance(self.obj.oid, int)
+ self.assertNotEqual(self.obj.oid, 0)
+
+ def testPgcn(self):
+ self.assertIs(self.obj.pgcnx, self.pgcnx)
+
+ def testError(self):
+ self.assertIsInstance(self.obj.error, str)
+ self.assertEqual(self.obj.error, '')
+
+ def testOpen(self):
+ open = self.obj.open
+ # testing with invalid parameters
+ self.assertRaises(TypeError, open)
+ self.assertRaises(TypeError, open, pg.INV_READ, pg.INV_WRITE)
+ open(pg.INV_READ)
+ # object is already open
+ self.assertRaises(IOError, open, pg.INV_READ)
+
+ def testClose(self):
+ close = self.obj.close
+ # testing with invalid parameters
+ self.assertRaises(TypeError, close, pg.INV_READ)
+ # object is not yet open
+ self.assertRaises(IOError, close)
+ self.obj.open(pg.INV_READ)
+ close()
+ self.assertRaises(IOError, close)
+
+ def testRead(self):
+ read = self.obj.read
+ # testing with invalid parameters
+ self.assertRaises(TypeError, read)
+ self.assertRaises(ValueError, read, -1)
+ self.assertRaises(TypeError, read, 'invalid')
+ self.assertRaises(TypeError, read, 80, 'invalid')
+ # reading when object is not yet open
+ self.assertRaises(IOError, read, 80)
+ data = 'some data to be read'
+ self.obj.open(pg.INV_WRITE)
+ self.obj.write(data)
+ self.obj.close()
+ self.obj.open(pg.INV_READ)
+ r = read(80)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, data)
+ self.obj.close()
+ self.obj.open(pg.INV_READ)
+ r = read(8)
+ self.assertIsInstance(r, str)
+ self.assertEqual(r, data[:8])
+ self.obj.close()
- def _listresult(q):
- return map(list, q.getresult())
+ def testWrite(self):
+ write = self.obj.write
+ # testing with invalid parameters
+ self.assertRaises(TypeError, write)
+ self.assertRaises(TypeError, write, -1)
+ self.assertRaises(TypeError, write, '', 'invalid')
+ # writing when object is not yet open
+ self.assertRaises(IOError, write, 'invalid')
+ data = 'some data to be written'
+ self.obj.open(pg.INV_WRITE)
+ write(data)
+ self.obj.close()
+ self.obj.open(pg.INV_READ)
+ r = self.obj.read(80)
+ self.assertEqual(r, data)
- pg.set_namedresult(_listresult)
+ def testSeek(self):
+ seek = self.obj.seek
+ # testing with invalid parameters
+ self.assertRaises(TypeError, seek)
+ self.assertRaises(TypeError, seek, 0)
+ self.assertRaises(TypeError, seek, 0, pg.SEEK_SET, pg.SEEK_END)
+ self.assertRaises(TypeError, seek, 'invalid', pg.SEEK_SET)
+ self.assertRaises(TypeError, seek, 0, 'invalid')
+ # seeking when object is not yet open
+ self.assertRaises(IOError, seek, 0, pg.SEEK_SET)
+ data = 'some data to be seeked'
+ self.obj.open(pg.INV_WRITE)
+ self.obj.write(data)
+ self.obj.close()
+ self.obj.open(pg.INV_READ)
+ seek(0, pg.SEEK_SET)
+ r = self.obj.read(9)
+ self.assertEqual(r, 'some data')
+ seek(4, pg.SEEK_CUR)
+ r = self.obj.read(2)
+ self.assertEqual(r, 'be')
+ seek(-10, pg.SEEK_CUR)
+ r = self.obj.read(4)
+ self.assertEqual(r, 'data')
+ seek(0, pg.SEEK_SET)
+ r = self.obj.read(4)
+ self.assertEqual(r, 'some')
+ seek(-6, pg.SEEK_END)
+ r = self.obj.read(4)
+ self.assertEqual(r, 'seek')
+
+ def testTell(self):
+ tell = self.obj.tell
+ # testing with invalid parameters
+ self.assertRaises(TypeError, tell, 0)
+ # telling when object is not yet open
+ self.assertRaises(IOError, tell)
+ data = 'some story to be told'
+ self.obj.open(pg.INV_WRITE)
+ self.obj.write(data)
+ r = tell()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, len(data))
+ self.obj.close()
+ self.obj.open(pg.INV_READ)
+ r = tell()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 0)
+ self.obj.seek(5, pg.SEEK_SET)
+ r = tell()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 5)
- try:
- r = query("select 1 as x, 2 as y").namedresult()[0]
- self.assertIsInstance(r, list)
- self.assertEqual(r, [1, 2])
- self.assertIsNot(type(r), tuple)
- self.assertFalse(hasattr(r, '_fields'))
- self.assertNotEqual(r.__class__.__name__, 'Row')
+ def testUnlink(self):
+ unlink = self.obj.unlink
+ # testing with invalid parameters
+ self.assertRaises(TypeError, unlink, 0)
+ # unlinking when object is still open
+ self.obj.open(pg.INV_WRITE)
+ self.assertRaises(IOError, unlink)
+ data = 'some data to be sold'
+ self.obj.write(data)
+ self.obj.close()
+ oid = self.obj.oid
+ self.assertIsInstance(oid, int)
+ self.assertNotEqual(oid, 0)
+ obj = self.pgcnx.getlo(oid)
+ try:
+ self.assertIsNot(obj, self.obj)
+ self.assertEqual(obj.oid, oid)
+ obj.open(pg.INV_READ)
+ r = obj.read(80)
+ obj.close()
+ self.assertEqual(r, data)
finally:
- pg.set_namedresult(_namedresult)
+ del obj
+ unlink()
+ self.assertIsNone(self.obj.oid)
+
+ def testSize(self):
+ size = self.obj.size
+ # testing with invalid parameters
+ self.assertRaises(TypeError, size, 0)
+ # sizing when object is not yet open
+ self.assertRaises(IOError, size)
+ # sizing an empty object
+ self.obj.open(pg.INV_READ)
+ r = size()
+ self.obj.close()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 0)
+ # sizing after adding some data
+ data = 'some data to be sized'
+ self.obj.open(pg.INV_WRITE)
+ self.obj.write(data)
+ self.obj.close()
+ # sizing when current position is zero
+ self.obj.open(pg.INV_READ)
+ r = size()
+ self.obj.close()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, len(data))
+ self.obj.open(pg.INV_READ)
+ # sizing when current position is not zero
+ self.obj.seek(5, pg.SEEK_SET)
+ r = size()
+ self.obj.close()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, len(data))
+ # sizing after adding more data
+ data += ' and more data'
+ self.obj.open(pg.INV_WRITE)
+ self.obj.write(data)
+ self.obj.close()
+ self.obj.open(pg.INV_READ)
+ r = size()
+ self.obj.close()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, len(data))
+
+ def testExport(self):
+ export = self.obj.export
+ # testing with invalid parameters
+ self.assertRaises(TypeError, export)
+ self.assertRaises(TypeError, export, 0)
+ self.assertRaises(TypeError, export, 'invalid', 0)
+ f = tempfile.NamedTemporaryFile()
+ data = 'some data to be exported'
+ self.obj.open(pg.INV_WRITE)
+ self.obj.write(data)
+ # exporting when object is not yet closed
+ self.assertRaises(IOError, export, f.name)
+ self.obj.close()
+ export(f.name)
+ r = f.read()
+ f.close()
+ self.assertEqual(r, data)
if __name__ == '__main__':
Modified: branches/4.x/module/pgmodule.c
==============================================================================
--- branches/4.x/module/pgmodule.c Thu Nov 19 10:39:13 2015 (r548)
+++ branches/4.x/module/pgmodule.c Thu Nov 19 13:49:11 2015 (r549)
@@ -1249,10 +1249,6 @@
int mode,
fd;
- /* check validity */
- if (!check_lo_obj(self, CHECK_CLOSE))
- return NULL;
-
/* gets arguments */
if (!PyArg_ParseTuple(args, "i", &mode))
{
@@ -1260,6 +1256,10 @@
return NULL;
}
+ /* check validity */
+ if (!check_lo_obj(self, CHECK_CLOSE))
+ return NULL;
+
/* opens large object */
if ((fd = lo_open(self->pgcnx->cnx, self->lo_oid, mode)) < 0)
{
@@ -1316,10 +1316,6 @@
int size;
PyObject *buffer;
- /* checks validity */
- if (!check_lo_obj(self, CHECK_OPEN))
- return NULL;
-
/* gets arguments */
if (!PyArg_ParseTuple(args, "i", &size))
{
@@ -1333,6 +1329,10 @@
return NULL;
}
+ /* checks validity */
+ if (!check_lo_obj(self, CHECK_OPEN))
+ return NULL;
+
/* allocate buffer and runs read */
buffer = PyString_FromStringAndSize((char *) NULL, size);
@@ -1360,10 +1360,6 @@
int size,
bufsize;
- /* checks validity */
- if (!check_lo_obj(self, CHECK_OPEN))
- return NULL;
-
/* gets arguments */
if (!PyArg_ParseTuple(args, "s#", &buffer, &bufsize))
{
@@ -1372,6 +1368,10 @@
return NULL;
}
+ /* checks validity */
+ if (!check_lo_obj(self, CHECK_OPEN))
+ return NULL;
+
/* sends query */
if ((size = lo_write(self->pgcnx->cnx, self->lo_fd, buffer,
bufsize)) < bufsize)
@@ -1399,10 +1399,6 @@
offset = 0,
whence = 0;
- /* checks validity */
- if (!check_lo_obj(self, CHECK_OPEN))
- return NULL;
-
/* gets arguments */
if (!PyArg_ParseTuple(args, "ii", &offset, &whence))
{
@@ -1411,6 +1407,10 @@
return NULL;
}
+ /* checks validity */
+ if (!check_lo_obj(self, CHECK_OPEN))
+ return NULL;
+
/* sends query */
if ((ret = lo_lseek(self->pgcnx->cnx, self->lo_fd, offset, whence)) ==
-1)
{
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql