Author: cito
Date: Thu Nov 19 09:19:43 2015
New Revision: 547
Log:
Add test for canceling a long-running query
Modified:
branches/4.x/module/TEST_PyGreSQL_classic_connection.py
Modified: branches/4.x/module/TEST_PyGreSQL_classic_connection.py
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic_connection.py Thu Nov 19
06:33:52 2015 (r546)
+++ branches/4.x/module/TEST_PyGreSQL_classic_connection.py Thu Nov 19
09:19:43 2015 (r547)
@@ -16,6 +16,8 @@
except ImportError:
import unittest
import sys
+import threading
+import time
import pg # the module under test
@@ -170,6 +172,57 @@
self.fail('Query should give an error for a closed connection')
self.connection = connect()
+ def testMethodReset(self):
+ query = self.connection.query
+ # check that client encoding gets reset
+ encoding = query('show client_encoding').getresult()[0][0].upper()
+ changed_encoding = 'LATIN1' if encoding == 'UTF8' else 'UTF8'
+ self.assertNotEqual(encoding, changed_encoding)
+ self.connection.query("set client_encoding=%s" % changed_encoding)
+ new_encoding = query('show client_encoding').getresult()[0][0].upper()
+ self.assertEqual(new_encoding, changed_encoding)
+ self.connection.reset()
+ new_encoding = query('show client_encoding').getresult()[0][0].upper()
+ self.assertNotEqual(new_encoding, changed_encoding)
+ self.assertEqual(new_encoding, encoding)
+
+ def testMethodCancel(self):
+ r = self.connection.cancel()
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 1)
+
+ def testCancelLongRunningThread(self):
+ errors = []
+
+ def sleep():
+ try:
+ self.connection.query('select pg_sleep(5)').getresult()
+ except pg.ProgrammingError, error:
+ errors.append(str(error))
+
+ thread = threading.Thread(target=sleep)
+ t1 = time.time()
+ thread.start() # run the query
+ while 1: # make sure the query is really running
+ time.sleep(0.1)
+ if thread.is_alive() or time.time() - t1 > 5:
+ break
+ r = self.connection.cancel() # cancel the running query
+ thread.join() # wait for the thread to end
+ t2 = time.time()
+
+ self.assertIsInstance(r, int)
+ self.assertEqual(r, 1) # return code should be 1
+ self.assertLessEqual(t2 - t1, 3) # time should be under 3 seconds
+ self.assertTrue(errors)
+
+ def testMethodFileNo(self):
+ r = self.connection.fileno()
+ self.assertIsInstance(r, int)
+ self.assertGreaterEqual(r, 0)
+
+
+
class TestSimpleQueries(unittest.TestCase):
""""Test simple queries via a basic pg connection."""
@@ -438,38 +491,6 @@
'2|xyz |uvw \n'
'(2 rows)\n')
- def testGetNotify(self):
- self.assertIsNone(self.c.getnotify())
- self.c.query('listen test_notify')
- try:
- self.assertIsNone(self.c.getnotify())
- self.c.query("notify test_notify")
- r = self.c.getnotify()
- self.assertIsInstance(r, tuple)
- self.assertEqual(len(r), 3)
- self.assertIsInstance(r[0], str)
- self.assertIsInstance(r[1], int)
- self.assertIsInstance(r[2], str)
- self.assertEqual(r[0], 'test_notify')
- self.assertEqual(r[2], '')
- self.assertIsNone(self.c.getnotify())
- try:
- self.c.query("notify test_notify, 'test_payload'")
- except pg.ProgrammingError: # PostgreSQL < 9.0
- pass
- else:
- r = self.c.getnotify()
- self.assertTrue(isinstance(r, tuple))
- self.assertEqual(len(r), 3)
- self.assertIsInstance(r[0], str)
- self.assertIsInstance(r[1], int)
- self.assertIsInstance(r[2], str)
- self.assertEqual(r[0], 'test_notify')
- self.assertEqual(r[2], 'test_payload')
- self.assertIsNone(self.c.getnotify())
- finally:
- self.c.query('unlisten test_notify')
-
class TestParamQueries(unittest.TestCase):
""""Test queries with parameters via a basic pg connection."""
@@ -726,9 +747,7 @@
class TestNoticeReceiver(unittest.TestCase):
- """"Test notice receiver support."""
-
- # Test database needed: must be run as a DBTestSuite.
+ """"Test notification support."""
def setUp(self):
self.c = connect()
@@ -736,6 +755,40 @@
def tearDown(self):
self.c.close()
+ def testGetNotify(self):
+ getnotify = self.connection.getnotify
+ query = self.c.query
+ self.assertIsNone(getnotify())
+ query('listen test_notify')
+ try:
+ self.assertIsNone(self.c.getnotify())
+ query("notify test_notify")
+ r = getnotify()
+ self.assertIsInstance(r, tuple)
+ self.assertEqual(len(r), 3)
+ self.assertIsInstance(r[0], str)
+ self.assertIsInstance(r[1], int)
+ self.assertIsInstance(r[2], str)
+ self.assertEqual(r[0], 'test_notify')
+ self.assertEqual(r[2], '')
+ self.assertIsNone(self.c.getnotify())
+ try:
+ query("notify test_notify, 'test_payload'")
+ except pg.ProgrammingError: # PostgreSQL < 9.0
+ pass
+ else:
+ r = getnotify()
+ self.assertTrue(isinstance(r, tuple))
+ self.assertEqual(len(r), 3)
+ self.assertIsInstance(r[0], str)
+ self.assertIsInstance(r[1], int)
+ self.assertIsInstance(r[2], str)
+ self.assertEqual(r[0], 'test_notify')
+ self.assertEqual(r[2], 'test_payload')
+ self.assertIsNone(getnotify())
+ finally:
+ query('unlisten test_notify')
+
def testGetNoticeReceiver(self):
self.assertIsNone(self.c.get_notice_receiver())
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql