Author: cito
Date: Thu Nov 19 13:49:11 2015
New Revision: 549

Log:
Add tests for large object support.

Also fix some minor issues in the C code and docs.
Particularly, type checks in large object support
should always be done before checking the object status.

Added:
   branches/4.x/module/TEST_PyGreSQL_classic_largeobj.py
      - copied, changed from r548, 
branches/4.x/module/TEST_PyGreSQL_classic_connection.py
Modified:
   branches/4.x/docs/classic.rst
   branches/4.x/module/TEST_PyGreSQL_classic_functions.py
   branches/4.x/module/pgmodule.c

Modified: branches/4.x/docs/classic.rst
==============================================================================
--- branches/4.x/docs/classic.rst       Thu Nov 19 10:39:13 2015        (r548)
+++ branches/4.x/docs/classic.rst       Thu Nov 19 13:49:11 2015        (r549)
@@ -1484,13 +1484,13 @@
   :integer: new position in object
 
 Exceptions raised:
-  :TypeError: binvalid connection or invalid object,
+  :TypeError: invalid connection or invalid object,
     bad parameter type, or too many parameters
   :IOError: object is not opened, or seek error
 
 Description:
   This method allows to move the position cursor in the large object. The
-  whence parameter can be obtained by OR-ing the constants defined in the
+  valid values for the whence parameter are defined as constants in the
   `pg` module (`SEEK_SET`, `SEEK_CUR`, `SEEK_END`).
 
 Syntax::

Modified: branches/4.x/module/TEST_PyGreSQL_classic_functions.py
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic_functions.py      Thu Nov 19 
10:39:13 2015        (r548)
+++ branches/4.x/module/TEST_PyGreSQL_classic_functions.py      Thu Nov 19 
13:49:11 2015        (r549)
@@ -317,15 +317,6 @@
             self.assertTrue(1 <= len(w) <= 2)
             self.assertTrue(w.isdigit())
 
-    def testLargeObjectIntConstants(self):
-        names = 'INV_READ INV_WRITE SEEK_SET SEEK_CUR SEEK_END'.split()
-        for name in names:
-            try:
-                value = getattr(pg, name)
-            except AttributeError:
-                self.fail('Module constant %s is missing' % name)
-            self.assertIsInstance(value, int)
-
 
 if __name__ == '__main__':
     unittest.main()

Copied and modified: branches/4.x/module/TEST_PyGreSQL_classic_largeobj.py 
(from r548, branches/4.x/module/TEST_PyGreSQL_classic_connection.py)
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic_connection.py     Thu Nov 19 
10:39:13 2015        (r548, copy source)
+++ branches/4.x/module/TEST_PyGreSQL_classic_largeobj.py       Thu Nov 19 
13:49:11 2015        (r549)
@@ -3,7 +3,7 @@
 
 """Test the classic PyGreSQL interface.
 
-Sub-tests for the low-level connection object.
+Sub-tests for large object support.
 
 Contributed by Christoph Zwerschke.
 
@@ -15,20 +15,10 @@
     import unittest2 as unittest  # for Python < 2.6
 except ImportError:
     import unittest
-import sys
-import threading
-import time
+import tempfile
 
 import pg  # the module under test
 
-from decimal import Decimal
-try:
-    from collections import namedtuple
-except ImportError:  # Python < 2.6
-    namedtuple = None
-
-from StringIO import StringIO
-
 # We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
 # get our information from that.  Otherwise we use the defaults.
 dbname = 'unittest'
@@ -48,945 +38,332 @@
     return connection
 
 
-class TestCanConnect(unittest.TestCase):
-    """Test whether a basic connection to PostgreSQL is possible."""
-
-    def testCanConnect(self):
-        try:
-            connection = connect()
-        except pg.Error, error:
-            self.fail('Cannot connect to database %s:\n%s' % (dbname, error))
-        try:
-            connection.close()
-        except pg.Error:
-            self.fail('Cannot close the database connection')
-
-
-class TestConnectObject(unittest.TestCase):
-    """"Test existence of basic pg connection methods."""
-
-    def setUp(self):
-        self.connection = connect()
-
-    def tearDown(self):
-        try:
-            self.connection.close()
-        except pg.InternalError:
-            pass
-
-    def testAllConnectAttributes(self):
-        attributes = '''db error host options port
-            protocol_version server_version status tty user'''.split()
-        connection_attributes = [a for a in dir(self.connection)
-            if not callable(eval("self.connection." + a))]
-        self.assertEqual(attributes, connection_attributes)
-
-    def testAllConnectMethods(self):
-        methods = '''cancel close endcopy
-            escape_bytea escape_identifier escape_literal escape_string
-            fileno get_notice_receiver getline getlo getnotify
-            inserttable locreate loimport parameter putline query reset
-            set_notice_receiver source transaction'''.split()
-        connection_methods = [a for a in dir(self.connection)
-            if callable(eval("self.connection." + a))]
-        self.assertEqual(methods, connection_methods)
-
-    def testAttributeDb(self):
-        self.assertEqual(self.connection.db, dbname)
-
-    def testAttributeError(self):
-        error = self.connection.error
-        self.assertTrue(not error or 'krb5_' in error)
-
-    def testAttributeHost(self):
-        def_host = 'localhost'
-        self.assertIsInstance(self.connection.host, str)
-        self.assertEqual(self.connection.host, dbhost or def_host)
-
-    def testAttributeOptions(self):
-        no_options = ''
-        self.assertEqual(self.connection.options, no_options)
-
-    def testAttributePort(self):
-        def_port = 5432
-        self.assertIsInstance(self.connection.port, int)
-        self.assertEqual(self.connection.port, dbport or def_port)
-
-    def testAttributeProtocolVersion(self):
-        protocol_version = self.connection.protocol_version
-        self.assertIsInstance(protocol_version, int)
-        self.assertTrue(2 <= protocol_version < 4)
-
-    def testAttributeServerVersion(self):
-        server_version = self.connection.server_version
-        self.assertIsInstance(server_version, int)
-        self.assertTrue(70400 <= server_version < 100000)
-
-    def testAttributeStatus(self):
-        status_ok = 1
-        self.assertIsInstance(self.connection.status, int)
-        self.assertEqual(self.connection.status, status_ok)
-
-    def testAttributeTty(self):
-        def_tty = ''
-        self.assertIsInstance(self.connection.tty, str)
-        self.assertEqual(self.connection.tty, def_tty)
-
-    def testAttributeUser(self):
-        no_user = 'Deprecated facility'
-        user = self.connection.user
-        self.assertTrue(user)
-        self.assertIsInstance(user, str)
-        self.assertNotEqual(user, no_user)
-
-    def testMethodQuery(self):
-        query = self.connection.query
-        query("select 1+1")
-        query("select 1+$1", (1,))
-        query("select 1+$1+$2", (2, 3))
-        query("select 1+$1+$2", [2, 3])
-
-    def testMethodQueryEmpty(self):
-        self.assertRaises(ValueError, self.connection.query, '')
+class TestModuleConstants(unittest.TestCase):
+    """Test the existence of the documented module constants."""
 
-    def testMethodEndcopy(self):
-        try:
-            self.connection.endcopy()
-        except IOError:
-            pass
-
-    def testMethodClose(self):
-        self.connection.close()
-        try:
-            self.connection.reset()
-        except (pg.Error, TypeError):
-            pass
-        else:
-            self.fail('Reset should give an error for a closed connection')
-        self.assertRaises(pg.InternalError, self.connection.close)
-        try:
-            self.connection.query('select 1')
-        except (pg.Error, TypeError):
-            pass
-        else:
-            self.fail('Query should give an error for a closed connection')
-        self.connection = connect()
-
-    def testMethodReset(self):
-        query = self.connection.query
-        # check that client encoding gets reset
-        encoding = query('show client_encoding').getresult()[0][0].upper()
-        changed_encoding = 'LATIN1' if encoding == 'UTF8' else 'UTF8'
-        self.assertNotEqual(encoding, changed_encoding)
-        self.connection.query("set client_encoding=%s" % changed_encoding)
-        new_encoding = query('show client_encoding').getresult()[0][0].upper()
-        self.assertEqual(new_encoding, changed_encoding)
-        self.connection.reset()
-        new_encoding = query('show client_encoding').getresult()[0][0].upper()
-        self.assertNotEqual(new_encoding, changed_encoding)
-        self.assertEqual(new_encoding, encoding)
-
-    def testMethodCancel(self):
-        r = self.connection.cancel()
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 1)
-
-    def testCancelLongRunningThread(self):
-        errors = []
-
-        def sleep():
+    def testLargeObjectIntConstants(self):
+        names = 'INV_READ INV_WRITE SEEK_SET SEEK_CUR SEEK_END'.split()
+        for name in names:
             try:
-                self.connection.query('select pg_sleep(5)').getresult()
-            except pg.ProgrammingError, error:
-                errors.append(str(error))
-
-        thread = threading.Thread(target=sleep)
-        t1 = time.time()
-        thread.start()  # run the query
-        while 1:  # make sure the query is really running
-            time.sleep(0.1)
-            if thread.is_alive() or time.time() - t1 > 5:
-                break
-        r = self.connection.cancel()  # cancel the running query
-        thread.join()  # wait for the thread to end
-        t2 = time.time()
-
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 1)  # return code should be 1
-        self.assertLessEqual(t2 - t1, 3)  # time should be under 3 seconds
-        self.assertTrue(errors)
-
-    def testMethodFileNo(self):
-        r = self.connection.fileno()
-        self.assertIsInstance(r, int)
-        self.assertGreaterEqual(r, 0)
+                value = getattr(pg, name)
+            except AttributeError:
+                self.fail('Module constant %s is missing' % name)
+            self.assertIsInstance(value, int)
 
 
-class TestSimpleQueries(unittest.TestCase):
-    """"Test simple queries via a basic pg connection."""
+class TestCreatingLargeObjects(unittest.TestCase):
+    """Test creating large objects using a connection."""
 
     def setUp(self):
         self.c = connect()
+        self.c.query('begin')
 
     def tearDown(self):
+        self.c.query('end')
         self.c.close()
 
-    def testSelect0(self):
-        q = "select 0"
-        self.c.query(q)
-
-    def testSelect0Semicolon(self):
-        q = "select 0;"
-        self.c.query(q)
-
-    def testSelectDotSemicolon(self):
-        q = "select .;"
-        self.assertRaises(pg.ProgrammingError, self.c.query, q)
-
-    def testGetresult(self):
-        q = "select 0"
-        result = [(0,)]
-        r = self.c.query(q).getresult()
-        self.assertIsInstance(r, list)
-        v = r[0]
-        self.assertIsInstance(v, tuple)
-        self.assertIsInstance(v[0], int)
-        self.assertEqual(r, result)
-
-    def testGetresultLong(self):
-        q = "select 1234567890123456790"
-        result = 1234567890123456790L
-        v = self.c.query(q).getresult()[0][0]
-        self.assertIsInstance(v, long)
-        self.assertEqual(v, result)
-
-    def testGetresultString(self):
-        result = 'Hello, world!'
-        q = "select '%s'" % result
-        v = self.c.query(q).getresult()[0][0]
-        self.assertIsInstance(v, str)
-        self.assertEqual(v, result)
-
-    def testDictresult(self):
-        q = "select 0 as alias0"
-        result = [{'alias0': 0}]
-        r = self.c.query(q).dictresult()
-        self.assertIsInstance(r, list)
-        v = r[0]
-        self.assertIsInstance(v, dict)
-        self.assertIsInstance(v['alias0'], int)
-        self.assertEqual(r, result)
-
-    def testDictresultLong(self):
-        q = "select 1234567890123456790 as longjohnsilver"
-        result = 1234567890123456790L
-        v = self.c.query(q).dictresult()[0]['longjohnsilver']
-        self.assertIsInstance(v, long)
-        self.assertEqual(v, result)
-
-    def testDictresultString(self):
-        result = 'Hello, world!'
-        q = "select '%s' as greeting" % result
-        v = self.c.query(q).dictresult()[0]['greeting']
-        self.assertIsInstance(v, str)
-        self.assertEqual(v, result)
-
-    @unittest.skipUnless(namedtuple, 'Named tuples not available')
-    def testNamedresult(self):
-        q = "select 0 as alias0"
-        result = [(0,)]
-        r = self.c.query(q).namedresult()
-        self.assertEqual(r, result)
-        v = r[0]
-        self.assertEqual(v._fields, ('alias0',))
-        self.assertEqual(v.alias0, 0)
-
-    def testGet3Cols(self):
-        q = "select 1,2,3"
-        result = [(1, 2, 3)]
-        r = self.c.query(q).getresult()
-        self.assertEqual(r, result)
-
-    def testGet3DictCols(self):
-        q = "select 1 as a,2 as b,3 as c"
-        result = [dict(a=1, b=2, c=3)]
-        r = self.c.query(q).dictresult()
-        self.assertEqual(r, result)
-
-    @unittest.skipUnless(namedtuple, 'Named tuples not available')
-    def testGet3NamedCols(self):
-        q = "select 1 as a,2 as b,3 as c"
-        result = [(1, 2, 3)]
-        r = self.c.query(q).namedresult()
-        self.assertEqual(r, result)
-        v = r[0]
-        self.assertEqual(v._fields, ('a', 'b', 'c'))
-        self.assertEqual(v.b, 2)
-
-    def testGet3Rows(self):
-        q = "select 3 union select 1 union select 2 order by 1"
-        result = [(1,), (2,), (3,)]
-        r = self.c.query(q).getresult()
-        self.assertEqual(r, result)
-
-    def testGet3DictRows(self):
-        q = ("select 3 as alias3"
-            " union select 1 union select 2 order by 1")
-        result = [{'alias3': 1}, {'alias3': 2}, {'alias3': 3}]
-        r = self.c.query(q).dictresult()
-        self.assertEqual(r, result)
-
-    @unittest.skipUnless(namedtuple, 'Named tuples not available')
-    def testGet3NamedRows(self):
-        q = ("select 3 as alias3"
-            " union select 1 union select 2 order by 1")
-        result = [(1,), (2,), (3,)]
-        r = self.c.query(q).namedresult()
-        self.assertEqual(r, result)
-        for v in r:
-            self.assertEqual(v._fields, ('alias3',))
-
-    def testDictresultNames(self):
-        q = "select 'MixedCase' as MixedCaseAlias"
-        result = [{'mixedcasealias': 'MixedCase'}]
-        r = self.c.query(q).dictresult()
-        self.assertEqual(r, result)
-        q = "select 'MixedCase' as \"MixedCaseAlias\""
-        result = [{'MixedCaseAlias': 'MixedCase'}]
-        r = self.c.query(q).dictresult()
-        self.assertEqual(r, result)
-
-    @unittest.skipUnless(namedtuple, 'Named tuples not available')
-    def testNamedresultNames(self):
-        q = "select 'MixedCase' as MixedCaseAlias"
-        result = [('MixedCase',)]
-        r = self.c.query(q).namedresult()
-        self.assertEqual(r, result)
-        v = r[0]
-        self.assertEqual(v._fields, ('mixedcasealias',))
-        self.assertEqual(v.mixedcasealias, 'MixedCase')
-        q = "select 'MixedCase' as \"MixedCaseAlias\""
-        r = self.c.query(q).namedresult()
-        self.assertEqual(r, result)
-        v = r[0]
-        self.assertEqual(v._fields, ('MixedCaseAlias',))
-        self.assertEqual(v.MixedCaseAlias, 'MixedCase')
-
-    def testBigGetresult(self):
-        num_cols = 100
-        num_rows = 100
-        q = "select " + ','.join(map(str, xrange(num_cols)))
-        q = ' union all '.join((q,) * num_rows)
-        r = self.c.query(q).getresult()
-        result = [tuple(range(num_cols))] * num_rows
-        self.assertEqual(r, result)
-
-    def testListfields(self):
-        q = ('select 0 as a, 0 as b, 0 as c,'
-            ' 0 as c, 0 as b, 0 as a,'
-            ' 0 as lowercase, 0 as UPPERCASE,'
-            ' 0 as MixedCase, 0 as "MixedCase",'
-            ' 0 as a_long_name_with_underscores,'
-            ' 0 as "A long name with Blanks"')
-        r = self.c.query(q).listfields()
-        result = ('a', 'b', 'c', 'c', 'b', 'a',
-            'lowercase', 'uppercase', 'mixedcase', 'MixedCase',
-            'a_long_name_with_underscores',
-            'A long name with Blanks')
-        self.assertEqual(r, result)
-
-    def testFieldname(self):
-        q = "select 0 as z, 0 as a, 0 as x, 0 as y"
-        r = self.c.query(q).fieldname(2)
-        self.assertEqual(r, 'x')
-        r = self.c.query(q).fieldname(3)
-        self.assertEqual(r, 'y')
-
-    def testFieldnum(self):
-        q = "select 1 as x"
-        self.assertRaises(ValueError, self.c.query(q).fieldnum, 'y')
-        q = "select 1 as x"
-        r = self.c.query(q).fieldnum('x')
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 0)
-        q = "select 0 as z, 0 as a, 0 as x, 0 as y"
-        r = self.c.query(q).fieldnum('x')
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 2)
-        r = self.c.query(q).fieldnum('y')
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 3)
-
-    def testNtuples(self):
-        q = "select 1 where false"
-        r = self.c.query(q).ntuples()
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 0)
-        q = ("select 1 as a, 2 as b, 3 as c, 4 as d"
-            " union select 5 as a, 6 as b, 7 as c, 8 as d")
-        r = self.c.query(q).ntuples()
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 2)
-        q = ("select 1 union select 2 union select 3"
-            " union select 4 union select 5 union select 6")
-        r = self.c.query(q).ntuples()
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, 6)
+    def assertIsLargeObject(self, obj):
+        self.assertIsNotNone(obj)
+        self.assertTrue(hasattr(obj, 'open'))
+        self.assertTrue(hasattr(obj, 'close'))
+        self.assertTrue(hasattr(obj, 'oid'))
+        self.assertTrue(hasattr(obj, 'pgcnx'))
+        self.assertTrue(hasattr(obj, 'error'))
+        self.assertIsInstance(obj.oid, int)
+        self.assertNotEqual(obj.oid, 0)
+        self.assertIs(obj.pgcnx, self.c)
+        self.assertIsInstance(obj.error, str)
+        self.assertFalse(obj.error)
 
-    def testQuery(self):
-        query = self.c.query
-        query("drop table if exists test_table")
-        q = "create table test_table (n integer) with oids"
-        r = query(q)
-        self.assertIsNone(r)
-        q = "insert into test_table values (1)"
-        r = query(q)
-        self.assertIsInstance(r, int)
-        q = "insert into test_table select 2"
-        r = query(q)
-        self.assertIsInstance(r, int)
-        oid = r
-        q = "select oid from test_table where n=2"
-        r = query(q).getresult()
-        self.assertEqual(len(r), 1)
-        r = r[0]
-        self.assertEqual(len(r), 1)
-        r = r[0]
-        self.assertIsInstance(r, int)
-        self.assertEqual(r, oid)
-        q = "insert into test_table select 3 union select 4 union select 5"
-        r = query(q)
-        self.assertIsInstance(r, str)
-        self.assertEqual(r, '3')
-        q = "update test_table set n=4 where n<5"
-        r = query(q)
-        self.assertIsInstance(r, str)
-        self.assertEqual(r, '4')
-        q = "delete from test_table"
-        r = query(q)
-        self.assertIsInstance(r, str)
-        self.assertEqual(r, '5')
-        query("drop table test_table")
-
-    def testPrint(self):
-        q = ("select 1 as a, 'hello' as h, 'w' as world"
-            " union select 2, 'xyz', 'uvw'")
-        r = self.c.query(q)
-        s = StringIO()
-        stdout, sys.stdout = sys.stdout, s
+    def testLoCreate(self):
+        large_object = self.c.locreate(pg.INV_READ | pg.INV_WRITE)
         try:
-            print r
-        except Exception:
-            pass
+            self.assertIsLargeObject(large_object)
         finally:
-            sys.stdout = stdout
-        r = s.getvalue()
-        s.close()
-        self.assertEqual(r,
-            'a|  h  |world\n'
-            '-+-----+-----\n'
-            '1|hello|w    \n'
-            '2|xyz  |uvw  \n'
-            '(2 rows)\n')
-
-
-class TestParamQueries(unittest.TestCase):
-    """"Test queries with parameters via a basic pg connection."""
-
-    def setUp(self):
-        self.c = connect()
-
-    def tearDown(self):
-        self.c.close()
-
-    def testQueryWithNoneParam(self):
-        self.assertEqual(self.c.query("select $1::integer", (None,)
-            ).getresult(), [(None,)])
-        self.assertEqual(self.c.query("select $1::text", [None]
-            ).getresult(), [(None,)])
-
-    def testQueryWithBoolParams(self):
-        query = self.c.query
-        self.assertEqual(query("select false").getresult(), [('f',)])
-        self.assertEqual(query("select true").getresult(), [('t',)])
-        self.assertEqual(query("select $1::bool", (None,)).getresult(),
-            [(None,)])
-        self.assertEqual(query("select $1::bool", ('f',)).getresult(), 
[('f',)])
-        self.assertEqual(query("select $1::bool", ('t',)).getresult(), 
[('t',)])
-        self.assertEqual(query("select $1::bool", ('false',)).getresult(),
-            [('f',)])
-        self.assertEqual(query("select $1::bool", ('true',)).getresult(),
-            [('t',)])
-        self.assertEqual(query("select $1::bool", ('n',)).getresult(), 
[('f',)])
-        self.assertEqual(query("select $1::bool", ('y',)).getresult(), 
[('t',)])
-        self.assertEqual(query("select $1::bool", (0,)).getresult(), [('f',)])
-        self.assertEqual(query("select $1::bool", (1,)).getresult(), [('t',)])
-        self.assertEqual(query("select $1::bool", (False,)).getresult(),
-            [('f',)])
-        self.assertEqual(query("select $1::bool", (True,)).getresult(),
-            [('t',)])
-
-    def testQueryWithIntParams(self):
-        query = self.c.query
-        self.assertEqual(query("select 1+1").getresult(), [(2,)])
-        self.assertEqual(query("select 1+$1", (1,)).getresult(), [(2,)])
-        self.assertEqual(query("select 1+$1", [1]).getresult(), [(2,)])
-        self.assertEqual(query("select $1::integer", (2,)).getresult(), [(2,)])
-        self.assertEqual(query("select $1::text", (2,)).getresult(), [('2',)])
-        self.assertEqual(query("select 1+$1::numeric", [1]).getresult(),
-            [(Decimal('2'),)])
-        self.assertEqual(query("select 1, $1::integer", (2,)
-            ).getresult(), [(1, 2)])
-        self.assertEqual(query("select 1 union select $1", (2,)
-            ).getresult(), [(1,), (2,)])
-        self.assertEqual(query("select $1::integer+$2", (1, 2)
-            ).getresult(), [(3,)])
-        self.assertEqual(query("select $1::integer+$2", [1, 2]
-            ).getresult(), [(3,)])
-        self.assertEqual(query("select 0+$1+$2+$3+$4+$5+$6", range(6)
-            ).getresult(), [(15,)])
-
-    def testQueryWithStrParams(self):
-        query = self.c.query
-        self.assertEqual(query("select $1||', world!'", ('Hello',)
-            ).getresult(), [('Hello, world!',)])
-        self.assertEqual(query("select $1||', world!'", ['Hello']
-            ).getresult(), [('Hello, world!',)])
-        self.assertEqual(query("select $1||', '||$2||'!'", ('Hello', 'world'),
-            ).getresult(), [('Hello, world!',)])
-        self.assertEqual(query("select $1::text", ('Hello, world!',)
-            ).getresult(), [('Hello, world!',)])
-        self.assertEqual(query("select $1::text,$2::text", ('Hello', 'world')
-            ).getresult(), [('Hello', 'world')])
-        self.assertEqual(query("select $1::text,$2::text", ['Hello', 'world']
-            ).getresult(), [('Hello', 'world')])
-        self.assertEqual(query("select $1::text union select $2::text",
-            ('Hello', 'world')).getresult(), [('Hello',), ('world',)])
-        self.assertEqual(query("select $1||', '||$2||'!'", ('Hello',
-            'w\xc3\xb6rld')).getresult(), [('Hello, w\xc3\xb6rld!',)])
-
-    def testQueryWithUnicodeParams(self):
-        query = self.c.query
-        query('set client_encoding = utf8')
-        self.assertEqual(query("select $1||', '||$2||'!'",
-            ('Hello', u'w\xf6rld')).getresult(), [('Hello, w\xc3\xb6rld!',)])
-        self.assertEqual(query("select $1||', '||$2||'!'",
-            ('Hello', u'\u043c\u0438\u0440')).getresult(),
-            [('Hello, \xd0\xbc\xd0\xb8\xd1\x80!',)])
-        query('set client_encoding = latin1')
-        self.assertEqual(query("select $1||', '||$2||'!'",
-            ('Hello', u'w\xf6rld')).getresult(), [('Hello, w\xf6rld!',)])
-        self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
-            ('Hello', u'\u043c\u0438\u0440'))
-        query('set client_encoding = iso_8859_1')
-        self.assertEqual(query("select $1||', '||$2||'!'",
-            ('Hello', u'w\xf6rld')).getresult(), [('Hello, w\xf6rld!',)])
-        self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
-            ('Hello', u'\u043c\u0438\u0440'))
-        query('set client_encoding = iso_8859_5')
-        self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
-            ('Hello', u'w\xf6rld'))
-        self.assertEqual(query("select $1||', '||$2||'!'",
-            ('Hello', u'\u043c\u0438\u0440')).getresult(),
-            [('Hello, \xdc\xd8\xe0!',)])
-        query('set client_encoding = sql_ascii')
-        self.assertRaises(UnicodeError, query, "select $1||', '||$2||'!'",
-            ('Hello', u'w\xf6rld'))
-
-    def testQueryWithMixedParams(self):
-        self.assertEqual(self.c.query("select $1+2,$2||', world!'",
-            (1, 'Hello'),).getresult(), [(3, 'Hello, world!')])
-        self.assertEqual(self.c.query("select $1::integer,$2::date,$3::text",
-            (4711, None, 'Hello!'),).getresult(), [(4711, None, 'Hello!')])
-
-    def testQueryWithDuplicateParams(self):
-        self.assertRaises(pg.ProgrammingError,
-            self.c.query, "select $1+$1", (1,))
-        self.assertRaises(pg.ProgrammingError,
-            self.c.query, "select $1+$1", (1, 2))
-
-    def testQueryWithZeroParams(self):
-        self.assertEqual(self.c.query("select 1+1", []
-            ).getresult(), [(2,)])
-
-    def testQueryWithGarbage(self):
-        garbage = r"'\{}+()-#[]oo324"
-        self.assertEqual(self.c.query("select $1::text AS garbage", (garbage,)
-            ).dictresult(), [{'garbage': garbage}])
-
-    def testUnicodeQuery(self):
-        query = self.c.query
-        self.assertEqual(query(u"select 1+1").getresult(), [(2,)])
-        self.assertRaises(TypeError, query, u"select 'Hello, w\xf6rld!'")
-
-
-class TestInserttable(unittest.TestCase):
-    """"Test inserttable method."""
-
-    @classmethod
-    def setUpClass(cls):
-        c = connect()
-        c.query("drop table if exists test cascade")
-        c.query("create table test ("
-            "i2 smallint, i4 integer, i8 bigint, b boolean, dt date, ti time,"
-            "d numeric, f4 real, f8 double precision, m money,"
-            "c char(1), v4 varchar(4), c4 char(4), t text)")
-        c.close()
-
-    @classmethod
-    def tearDownClass(cls):
-        c = connect()
-        c.query("drop table test cascade")
-        c.close()
-
-    def setUp(self):
-        self.c = connect()
-        self.c.query("set datestyle='ISO,YMD'")
+            del large_object
 
-    def tearDown(self):
-        self.c.query("truncate table test")
-        self.c.close()
-
-    data = [
-        (-1, -1, -1L, True, '1492-10-12', '08:30:00',
-            -1.2345, -1.75, -1.875, '-1.25', '-', 'r?', '!u', 'xyz'),
-        (0, 0, 0L, False, '1607-04-14', '09:00:00',
-            0.0, 0.0, 0.0, '0.0', ' ', '0123', '4567', '890'),
-        (1, 1, 1L, True, '1801-03-04', '03:45:00',
-            1.23456, 1.75, 1.875, '1.25', 'x', 'bc', 'cdef', 'g'),
-        (2, 2, 2L, False, '1903-12-17', '11:22:00',
-            2.345678, 2.25, 2.125, '2.75', 'y', 'q', 'ijk', 'mnop\nstux!')]
-
-    def get_back(self):
-        """Convert boolean and decimal values back."""
-        data = []
-        for row in self.c.query("select * from test order by 1").getresult():
-            self.assertIsInstance(row, tuple)
-            row = list(row)
-            if row[0] is not None:  # smallint
-                self.assertIsInstance(row[0], int)
-            if row[1] is not None:  # integer
-                self.assertIsInstance(row[1], int)
-            if row[2] is not None:  # bigint
-                self.assertIsInstance(row[2], long)
-            if row[3] is not None:  # boolean
-                self.assertIsInstance(row[3], str)
-                row[3] = {'f': False, 't': True}.get(row[3])
-            if row[4] is not None:  # date
-                self.assertIsInstance(row[4], str)
-                self.assertTrue(row[4].replace('-', '').isdigit())
-            if row[5] is not None:  # time
-                self.assertIsInstance(row[5], str)
-                self.assertTrue(row[5].replace(':', '').isdigit())
-            if row[6] is not None:  # numeric
-                self.assertIsInstance(row[6], Decimal)
-                row[6] = float(row[6])
-            if row[7] is not None:  # real
-                self.assertIsInstance(row[7], float)
-            if row[8] is not None:  # double precision
-                self.assertIsInstance(row[8], float)
-                row[8] = float(row[8])
-            if row[9] is not None:  # money
-                self.assertIsInstance(row[9], Decimal)
-                row[9] = str(float(row[9]))
-            if row[10] is not None:  # char(1)
-                self.assertIsInstance(row[10], str)
-                self.assertEqual(len(row[10]), 1)
-            if row[11] is not None:  # varchar(4)
-                self.assertIsInstance(row[11], str)
-                self.assertLessEqual(len(row[11]), 4)
-            if row[12] is not None:  # char(4)
-                self.assertIsInstance(row[12], str)
-                self.assertEqual(len(row[12]), 4)
-                row[12] = row[12].rstrip()
-            if row[13] is not None:  # text
-                self.assertIsInstance(row[13], str)
-            row = tuple(row)
-            data.append(row)
-        return data
-
-    def testInserttable1Row(self):
-        data = self.data[2:3]
-        self.c.inserttable("test", data)
-        self.assertEqual(self.get_back(), data)
-
-    def testInserttable4Rows(self):
-        data = self.data
-        self.c.inserttable("test", data)
-        self.assertEqual(self.get_back(), data)
-
-    def testInserttableMultipleRows(self):
-        num_rows = 100
-        data = self.data[2:3] * num_rows
-        self.c.inserttable("test", data)
-        r = self.c.query("select count(*) from test").getresult()[0][0]
-        self.assertEqual(r, num_rows)
-
-    def testInserttableMultipleCalls(self):
-        num_rows = 10
-        data = self.data[2:3]
-        for _i in range(num_rows):
-            self.c.inserttable("test", data)
-        r = self.c.query("select count(*) from test").getresult()[0][0]
-        self.assertEqual(r, num_rows)
-
-    def testInserttableNullValues(self):
-        data = [(None,) * 14] * 100
-        self.c.inserttable("test", data)
-        self.assertEqual(self.get_back(), data)
-
-    def testInserttableMaxValues(self):
-        data = [(2 ** 15 - 1, int(2 ** 31 - 1), long(2 ** 31 - 1),
-            True, '2999-12-31', '11:59:59', 1e99,
-            1.0 + 1.0 / 32, 1.0 + 1.0 / 32, None,
-            "1", "1234", "1234", "1234" * 100)]
-        self.c.inserttable("test", data)
-        self.assertEqual(self.get_back(), data)
-
-
-class TestDirectSocketAccess(unittest.TestCase):
-    """"Test copy command with direct socket access."""
-
-    @classmethod
-    def setUpClass(cls):
-        c = connect()
-        c.query("drop table if exists test cascade")
-        c.query("create table test (i int, v varchar(16))")
-        c.close()
-
-    @classmethod
-    def tearDownClass(cls):
-        c = connect()
-        c.query("drop table test cascade")
-        c.close()
-
-    def setUp(self):
-        self.c = connect()
-        self.c.query("set datestyle='ISO,YMD'")
-
-    def tearDown(self):
-        self.c.query("truncate table test")
-        self.c.close()
-
-    def testPutline(self):
-        putline = self.c.putline
-        query = self.c.query
-        data = list(enumerate("apple pear plum cherry banana".split()))
-        query("copy test from stdin")
+    def testGetLo(self):
+        large_object = self.c.locreate(pg.INV_READ | pg.INV_WRITE)
         try:
-            for i, v in data:
-                putline("%d\t%s\n" % (i, v))
-            putline("\\.\n")
+            self.assertIsLargeObject(large_object)
+            oid = large_object.oid
         finally:
-            self.c.endcopy()
-        r = query("select * from test").getresult()
-        self.assertEqual(r, data)
-
-    def testPutline(self):
-        getline = self.c.getline
-        query = self.c.query
-        data = list(enumerate("apple banana pear plum strawberry".split()))
-        n = len(data)
-        self.c.inserttable('test', data)
-        query("copy test to stdout")
-        try:
-            for i in range(n + 2):
-                v = getline()
-                if i < n:
-                    self.assertEqual(v, '%d\t%s' % data[i])
-                elif i == n:
-                    self.assertEqual(v, '\\.')
-                else:
-                    self.assertIsNone(v)
+            del large_object
+        data = 'some data to be shared'
+        large_object = self.c.getlo(oid)
+        try:
+            self.assertIsLargeObject(large_object)
+            self.assertEqual(large_object.oid, oid)
+            large_object.open(pg.INV_WRITE)
+            large_object.write(data)
+            large_object.close()
         finally:
-            try:
-                self.c.endcopy()
-            except IOError:
-                pass
-
-    def testParameterChecks(self):
-        self.assertRaises(TypeError, self.c.putline)
-        self.assertRaises(TypeError, self.c.getline, 'invalid')
-        self.assertRaises(TypeError, self.c.endcopy, 'invalid')
-
-
-class TestNotificatons(unittest.TestCase):
-    """"Test notification support."""
-
-    def setUp(self):
-        self.c = connect()
-
-    def tearDown(self):
-        self.c.close()
-
-    def testGetNotify(self):
-        getnotify = self.c.getnotify
-        query = self.c.query
-        self.assertIsNone(getnotify())
-        query('listen test_notify')
+            del large_object
+        large_object = self.c.getlo(oid)
         try:
-            self.assertIsNone(self.c.getnotify())
-            query("notify test_notify")
-            r = getnotify()
-            self.assertIsInstance(r, tuple)
-            self.assertEqual(len(r), 3)
-            self.assertIsInstance(r[0], str)
-            self.assertIsInstance(r[1], int)
-            self.assertIsInstance(r[2], str)
-            self.assertEqual(r[0], 'test_notify')
-            self.assertEqual(r[2], '')
-            self.assertIsNone(self.c.getnotify())
-            try:
-                query("notify test_notify, 'test_payload'")
-            except pg.ProgrammingError:  # PostgreSQL < 9.0
-                pass
-            else:
-                r = getnotify()
-                self.assertTrue(isinstance(r, tuple))
-                self.assertEqual(len(r), 3)
-                self.assertIsInstance(r[0], str)
-                self.assertIsInstance(r[1], int)
-                self.assertIsInstance(r[2], str)
-                self.assertEqual(r[0], 'test_notify')
-                self.assertEqual(r[2], 'test_payload')
-                self.assertIsNone(getnotify())
+            self.assertIsLargeObject(large_object)
+            self.assertEqual(large_object.oid, oid)
+            large_object.open(pg.INV_READ)
+            r = large_object.read(80)
+            large_object.close()
+            large_object.unlink()
         finally:
-            query('unlisten test_notify')
-
-    def testGetNoticeReceiver(self):
-        self.assertIsNone(self.c.get_notice_receiver())
-
-    def testSetNoticeReceiver(self):
-        self.assertRaises(TypeError, self.c.set_notice_receiver, None)
-        self.assertRaises(TypeError, self.c.set_notice_receiver, 42)
-        self.assertIsNone(self.c.set_notice_receiver(lambda notice: None))
-
-    def testSetAndGetNoticeReceiver(self):
-        r = lambda notice: None
-        self.assertIsNone(self.c.set_notice_receiver(r))
-        self.assertIs(self.c.get_notice_receiver(), r)
-
-    def testNoticeReceiver(self):
-        self.c.query('''create function bilbo_notice() returns void AS $$
-            begin
-                raise warning 'Bilbo was here!';
-            end;
-            $$ language plpgsql''')
-        try:
-            received = {}
+            del large_object
+        self.assertEqual(r, data)
 
-            def notice_receiver(notice):
-                for attr in dir(notice):
-                    value = getattr(notice, attr)
-                    if isinstance(value, str):
-                        value = value.replace('WARNUNG', 'WARNING')
-                    received[attr] = value
-
-            self.c.set_notice_receiver(notice_receiver)
-            self.c.query('''select bilbo_notice()''')
-            self.assertEqual(received, dict(
-                pgcnx=self.c, message='WARNING:  Bilbo was here!\n',
-                severity='WARNING', primary='Bilbo was here!',
-                detail=None, hint=None))
+    def testLoImport(self):
+        f = tempfile.NamedTemporaryFile()
+        data = 'some data to be imported'
+        f.write(data)
+        f.flush()
+        f.seek(0)
+        large_object = self.c.loimport(f.name)
+        try:
+            f.close()
+            self.assertIsLargeObject(large_object)
+            large_object.open(pg.INV_READ)
+            large_object.seek(0, pg.SEEK_SET)
+            r = large_object.size()
+            self.assertIsInstance(r, int)
+            self.assertEqual(r, len(data))
+            r = large_object.read(80)
+            self.assertIsInstance(r, str)
+            self.assertEqual(r, data)
+            large_object.close()
+            large_object.unlink()
         finally:
-            self.c.query('''drop function bilbo_notice();''')
-
-
-class TestConfigFunctions(unittest.TestCase):
-    """Test the functions for changing default settings.
+            del large_object
 
-    To test the effect of most of these functions, we need a database
-    connection.  That's why they are covered in this test module.
 
-    """
+class TestLargeObjects(unittest.TestCase):
+    """Test the large object methods."""
 
     def setUp(self):
-        self.c = connect()
+        self.pgcnx = connect()
+        self.pgcnx.query('begin')
+        self.obj = self.pgcnx.locreate(pg.INV_READ | pg.INV_WRITE)
 
     def tearDown(self):
-        self.c.close()
-
-    def testGetDecimalPoint(self):
-        point = pg.get_decimal_point()
-        self.assertIsInstance(point, str)
-        self.assertEqual(point, '.')
-
-    def testSetDecimalPoint(self):
-        d = pg.Decimal
-        point = pg.get_decimal_point()
-        query = self.c.query
-        # check that money values can be interpreted correctly
-        # if and only if the decimal point is set appropriately
-        # for the current lc_monetary setting
-        query("set lc_monetary='en_US'")
-        pg.set_decimal_point('.')
-        r = query("select '34.25'::money").getresult()[0][0]
-        self.assertIsInstance(r, d)
-        self.assertEqual(r, d('34.25'))
-        pg.set_decimal_point(',')
-        r = query("select '34.25'::money").getresult()[0][0]
-        self.assertNotEqual(r, d('34.25'))
-        query("set lc_monetary='de_DE'")
-        pg.set_decimal_point(',')
-        r = query("select '34,25'::money").getresult()[0][0]
-        self.assertIsInstance(r, d)
-        self.assertEqual(r, d('34.25'))
-        pg.set_decimal_point('.')
-        r = query("select '34,25'::money").getresult()[0][0]
-        self.assertNotEqual(r, d('34.25'))
-        pg.set_decimal_point(point)
-
-    def testSetDecimal(self):
-        d = pg.Decimal
-        query = self.c.query
-        r = query("select 3425::numeric").getresult()[0][0]
-        self.assertIsInstance(r, d)
-        self.assertEqual(r, d('3425'))
-        pg.set_decimal(long)
-        r = query("select 3425::numeric").getresult()[0][0]
-        self.assertNotIsInstance(r, d)
-        self.assertIsInstance(r, long)
-        self.assertEqual(r, 3425L)
-        pg.set_decimal(d)
-
-    @unittest.skipUnless(namedtuple, 'Named tuples not available')
-    def testSetNamedresult(self):
-        query = self.c.query
-
-        r = query("select 1 as x, 2 as y").namedresult()[0]
-        self.assertIsInstance(r, tuple)
-        self.assertEqual(r, (1, 2))
-        self.assertIsNot(type(r), tuple)
-        self.assertEqual(r._fields, ('x', 'y'))
-        self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
-        self.assertEqual(r.__class__.__name__, 'Row')
-
-        _namedresult = pg._namedresult
-        self.assertTrue(callable(_namedresult))
-        pg.set_namedresult(_namedresult)
-
-        r = query("select 1 as x, 2 as y").namedresult()[0]
-        self.assertIsInstance(r, tuple)
-        self.assertEqual(r, (1, 2))
-        self.assertIsNot(type(r), tuple)
-        self.assertEqual(r._fields, ('x', 'y'))
-        self.assertEqual(r._asdict(), {'x': 1, 'y': 2})
-        self.assertEqual(r.__class__.__name__, 'Row')
+        if self.obj.oid:
+            try:
+                self.obj.close()
+            except IOError:
+                pass
+            try:
+                self.obj.unlink()
+            except IOError:
+                pass
+        del self.obj
+        self.pgcnx.query('end')
+        self.pgcnx.close()
+
+    def testOid(self):
+        self.assertIsInstance(self.obj.oid, int)
+        self.assertNotEqual(self.obj.oid, 0)
+
+    def testPgcn(self):
+        self.assertIs(self.obj.pgcnx, self.pgcnx)
+
+    def testError(self):
+        self.assertIsInstance(self.obj.error, str)
+        self.assertEqual(self.obj.error, '')
+
+    def testOpen(self):
+        open = self.obj.open
+        # testing with invalid parameters
+        self.assertRaises(TypeError, open)
+        self.assertRaises(TypeError, open, pg.INV_READ, pg.INV_WRITE)
+        open(pg.INV_READ)
+        # object is already open
+        self.assertRaises(IOError, open, pg.INV_READ)
+
+    def testClose(self):
+        close = self.obj.close
+        # testing with invalid parameters
+        self.assertRaises(TypeError, close, pg.INV_READ)
+        # object is not yet open
+        self.assertRaises(IOError, close)
+        self.obj.open(pg.INV_READ)
+        close()
+        self.assertRaises(IOError, close)
+
+    def testRead(self):
+        read = self.obj.read
+        # testing with invalid parameters
+        self.assertRaises(TypeError, read)
+        self.assertRaises(ValueError, read, -1)
+        self.assertRaises(TypeError, read, 'invalid')
+        self.assertRaises(TypeError, read, 80, 'invalid')
+        # reading when object is not yet open
+        self.assertRaises(IOError, read, 80)
+        data = 'some data to be read'
+        self.obj.open(pg.INV_WRITE)
+        self.obj.write(data)
+        self.obj.close()
+        self.obj.open(pg.INV_READ)
+        r = read(80)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, data)
+        self.obj.close()
+        self.obj.open(pg.INV_READ)
+        r = read(8)
+        self.assertIsInstance(r, str)
+        self.assertEqual(r, data[:8])
+        self.obj.close()
 
-        def _listresult(q):
-            return map(list, q.getresult())
+    def testWrite(self):
+        write = self.obj.write
+        # testing with invalid parameters
+        self.assertRaises(TypeError, write)
+        self.assertRaises(TypeError, write, -1)
+        self.assertRaises(TypeError, write, '', 'invalid')
+        # writing when object is not yet open
+        self.assertRaises(IOError, write, 'invalid')
+        data = 'some data to be written'
+        self.obj.open(pg.INV_WRITE)
+        write(data)
+        self.obj.close()
+        self.obj.open(pg.INV_READ)
+        r = self.obj.read(80)
+        self.assertEqual(r, data)
 
-        pg.set_namedresult(_listresult)
+    def testSeek(self):
+        seek = self.obj.seek
+        # testing with invalid parameters
+        self.assertRaises(TypeError, seek)
+        self.assertRaises(TypeError, seek, 0)
+        self.assertRaises(TypeError, seek, 0, pg.SEEK_SET, pg.SEEK_END)
+        self.assertRaises(TypeError, seek, 'invalid', pg.SEEK_SET)
+        self.assertRaises(TypeError, seek, 0, 'invalid')
+        # seeking when object is not yet open
+        self.assertRaises(IOError, seek, 0, pg.SEEK_SET)
+        data = 'some data to be seeked'
+        self.obj.open(pg.INV_WRITE)
+        self.obj.write(data)
+        self.obj.close()
+        self.obj.open(pg.INV_READ)
+        seek(0, pg.SEEK_SET)
+        r = self.obj.read(9)
+        self.assertEqual(r, 'some data')
+        seek(4, pg.SEEK_CUR)
+        r = self.obj.read(2)
+        self.assertEqual(r, 'be')
+        seek(-10, pg.SEEK_CUR)
+        r = self.obj.read(4)
+        self.assertEqual(r, 'data')
+        seek(0, pg.SEEK_SET)
+        r = self.obj.read(4)
+        self.assertEqual(r, 'some')
+        seek(-6, pg.SEEK_END)
+        r = self.obj.read(4)
+        self.assertEqual(r, 'seek')
+
+    def testTell(self):
+        tell = self.obj.tell
+        # testing with invalid parameters
+        self.assertRaises(TypeError, tell, 0)
+        # telling when object is not yet open
+        self.assertRaises(IOError, tell)
+        data = 'some story to be told'
+        self.obj.open(pg.INV_WRITE)
+        self.obj.write(data)
+        r = tell()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, len(data))
+        self.obj.close()
+        self.obj.open(pg.INV_READ)
+        r = tell()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 0)
+        self.obj.seek(5, pg.SEEK_SET)
+        r = tell()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 5)
 
-        try:
-            r = query("select 1 as x, 2 as y").namedresult()[0]
-            self.assertIsInstance(r, list)
-            self.assertEqual(r, [1, 2])
-            self.assertIsNot(type(r), tuple)
-            self.assertFalse(hasattr(r, '_fields'))
-            self.assertNotEqual(r.__class__.__name__, 'Row')
+    def testUnlink(self):
+        unlink = self.obj.unlink
+        # testing with invalid parameters
+        self.assertRaises(TypeError, unlink, 0)
+        # unlinking when object is still open
+        self.obj.open(pg.INV_WRITE)
+        self.assertRaises(IOError, unlink)
+        data = 'some data to be sold'
+        self.obj.write(data)
+        self.obj.close()
+        oid = self.obj.oid
+        self.assertIsInstance(oid, int)
+        self.assertNotEqual(oid, 0)
+        obj = self.pgcnx.getlo(oid)
+        try:
+            self.assertIsNot(obj, self.obj)
+            self.assertEqual(obj.oid, oid)
+            obj.open(pg.INV_READ)
+            r = obj.read(80)
+            obj.close()
+            self.assertEqual(r, data)
         finally:
-            pg.set_namedresult(_namedresult)
+            del obj
+        unlink()
+        self.assertIsNone(self.obj.oid)
+
+    def testSize(self):
+        size = self.obj.size
+        # testing with invalid parameters
+        self.assertRaises(TypeError, size, 0)
+        # sizing when object is not yet open
+        self.assertRaises(IOError, size)
+        # sizing an empty object
+        self.obj.open(pg.INV_READ)
+        r = size()
+        self.obj.close()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 0)
+        # sizing after adding some data
+        data = 'some data to be sized'
+        self.obj.open(pg.INV_WRITE)
+        self.obj.write(data)
+        self.obj.close()
+        # sizing when current position is zero
+        self.obj.open(pg.INV_READ)
+        r = size()
+        self.obj.close()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, len(data))
+        self.obj.open(pg.INV_READ)
+        # sizing when current position is not zero
+        self.obj.seek(5, pg.SEEK_SET)
+        r = size()
+        self.obj.close()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, len(data))
+        # sizing after adding more data
+        data += ' and more data'
+        self.obj.open(pg.INV_WRITE)
+        self.obj.write(data)
+        self.obj.close()
+        self.obj.open(pg.INV_READ)
+        r = size()
+        self.obj.close()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, len(data))
+
+    def testExport(self):
+        export = self.obj.export
+        # testing with invalid parameters
+        self.assertRaises(TypeError, export)
+        self.assertRaises(TypeError, export, 0)
+        self.assertRaises(TypeError, export, 'invalid', 0)
+        f = tempfile.NamedTemporaryFile()
+        data = 'some data to be exported'
+        self.obj.open(pg.INV_WRITE)
+        self.obj.write(data)
+        # exporting when object is not yet closed
+        self.assertRaises(IOError, export, f.name)
+        self.obj.close()
+        export(f.name)
+        r = f.read()
+        f.close()
+        self.assertEqual(r, data)
 
 
 if __name__ == '__main__':

Modified: branches/4.x/module/pgmodule.c
==============================================================================
--- branches/4.x/module/pgmodule.c      Thu Nov 19 10:39:13 2015        (r548)
+++ branches/4.x/module/pgmodule.c      Thu Nov 19 13:49:11 2015        (r549)
@@ -1249,10 +1249,6 @@
        int                     mode,
                                fd;
 
-       /* check validity */
-       if (!check_lo_obj(self, CHECK_CLOSE))
-               return NULL;
-
        /* gets arguments */
        if (!PyArg_ParseTuple(args, "i", &mode))
        {
@@ -1260,6 +1256,10 @@
                return NULL;
        }
 
+       /* check validity */
+       if (!check_lo_obj(self, CHECK_CLOSE))
+               return NULL;
+
        /* opens large object */
        if ((fd = lo_open(self->pgcnx->cnx, self->lo_oid, mode)) < 0)
        {
@@ -1316,10 +1316,6 @@
        int                     size;
        PyObject   *buffer;
 
-       /* checks validity */
-       if (!check_lo_obj(self, CHECK_OPEN))
-               return NULL;
-
        /* gets arguments */
        if (!PyArg_ParseTuple(args, "i", &size))
        {
@@ -1333,6 +1329,10 @@
                return NULL;
        }
 
+       /* checks validity */
+       if (!check_lo_obj(self, CHECK_OPEN))
+               return NULL;
+
        /* allocate buffer and runs read */
        buffer = PyString_FromStringAndSize((char *) NULL, size);
 
@@ -1360,10 +1360,6 @@
        int                     size,
                                bufsize;
 
-       /* checks validity */
-       if (!check_lo_obj(self, CHECK_OPEN))
-               return NULL;
-
        /* gets arguments */
        if (!PyArg_ParseTuple(args, "s#", &buffer, &bufsize))
        {
@@ -1372,6 +1368,10 @@
                return NULL;
        }
 
+       /* checks validity */
+       if (!check_lo_obj(self, CHECK_OPEN))
+               return NULL;
+
        /* sends query */
        if ((size = lo_write(self->pgcnx->cnx, self->lo_fd, buffer,
                                                 bufsize)) < bufsize)
@@ -1399,10 +1399,6 @@
                                offset = 0,
                                whence = 0;
 
-       /* checks validity */
-       if (!check_lo_obj(self, CHECK_OPEN))
-               return NULL;
-
        /* gets arguments */
        if (!PyArg_ParseTuple(args, "ii", &offset, &whence))
        {
@@ -1411,6 +1407,10 @@
                return NULL;
        }
 
+       /* checks validity */
+       if (!check_lo_obj(self, CHECK_OPEN))
+               return NULL;
+
        /* sends query */
        if ((ret = lo_lseek(self->pgcnx->cnx, self->lo_fd, offset, whence)) == 
-1)
        {
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to