Author: cito
Date: Thu Nov 19 09:19:43 2015
New Revision: 547

Log:
Add test for canceling a long-running query

Modified:
   branches/4.x/module/TEST_PyGreSQL_classic_connection.py

Modified: branches/4.x/module/TEST_PyGreSQL_classic_connection.py
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic_connection.py     Thu Nov 19 
06:33:52 2015        (r546)
+++ branches/4.x/module/TEST_PyGreSQL_classic_connection.py     Thu Nov 19 
09:19:43 2015        (r547)
@@ -16,6 +16,8 @@
 except ImportError:
     import unittest
 import sys
+import threading
+import time
 
 import pg  # the module under test
 
@@ -170,6 +172,57 @@
             self.fail('Query should give an error for a closed connection')
         self.connection = connect()
 
+    def testMethodReset(self):
+        query = self.connection.query
+        # check that client encoding gets reset
+        encoding = query('show client_encoding').getresult()[0][0].upper()
+        changed_encoding = 'LATIN1' if encoding == 'UTF8' else 'UTF8'
+        self.assertNotEqual(encoding, changed_encoding)
+        self.connection.query("set client_encoding=%s" % changed_encoding)
+        new_encoding = query('show client_encoding').getresult()[0][0].upper()
+        self.assertEqual(new_encoding, changed_encoding)
+        self.connection.reset()
+        new_encoding = query('show client_encoding').getresult()[0][0].upper()
+        self.assertNotEqual(new_encoding, changed_encoding)
+        self.assertEqual(new_encoding, encoding)
+
+    def testMethodCancel(self):
+        r = self.connection.cancel()
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 1)
+
+    def testCancelLongRunningThread(self):
+        errors = []
+
+        def sleep():
+            try:
+                self.connection.query('select pg_sleep(5)').getresult()
+            except pg.ProgrammingError, error:
+                errors.append(str(error))
+
+        thread = threading.Thread(target=sleep)
+        t1 = time.time()
+        thread.start()  # run the query
+        while 1:  # make sure the query is really running
+            time.sleep(0.1)
+            if thread.is_alive() or time.time() - t1 > 5:
+                break
+        r = self.connection.cancel()  # cancel the running query
+        thread.join()  # wait for the thread to end
+        t2 = time.time()
+
+        self.assertIsInstance(r, int)
+        self.assertEqual(r, 1)  # return code should be 1
+        self.assertLessEqual(t2 - t1, 3)  # time should be under 3 seconds
+        self.assertTrue(errors)
+
+    def testMethodFileNo(self):
+        r = self.connection.fileno()
+        self.assertIsInstance(r, int)
+        self.assertGreaterEqual(r, 0)
+
+
+
 
 class TestSimpleQueries(unittest.TestCase):
     """"Test simple queries via a basic pg connection."""
@@ -438,38 +491,6 @@
             '2|xyz  |uvw  \n'
             '(2 rows)\n')
 
-    def testGetNotify(self):
-        self.assertIsNone(self.c.getnotify())
-        self.c.query('listen test_notify')
-        try:
-            self.assertIsNone(self.c.getnotify())
-            self.c.query("notify test_notify")
-            r = self.c.getnotify()
-            self.assertIsInstance(r, tuple)
-            self.assertEqual(len(r), 3)
-            self.assertIsInstance(r[0], str)
-            self.assertIsInstance(r[1], int)
-            self.assertIsInstance(r[2], str)
-            self.assertEqual(r[0], 'test_notify')
-            self.assertEqual(r[2], '')
-            self.assertIsNone(self.c.getnotify())
-            try:
-                self.c.query("notify test_notify, 'test_payload'")
-            except pg.ProgrammingError:  # PostgreSQL < 9.0
-                pass
-            else:
-                r = self.c.getnotify()
-                self.assertTrue(isinstance(r, tuple))
-                self.assertEqual(len(r), 3)
-                self.assertIsInstance(r[0], str)
-                self.assertIsInstance(r[1], int)
-                self.assertIsInstance(r[2], str)
-                self.assertEqual(r[0], 'test_notify')
-                self.assertEqual(r[2], 'test_payload')
-                self.assertIsNone(self.c.getnotify())
-        finally:
-            self.c.query('unlisten test_notify')
-
 
 class TestParamQueries(unittest.TestCase):
     """"Test queries with parameters via a basic pg connection."""
@@ -726,9 +747,7 @@
 
 
 class TestNoticeReceiver(unittest.TestCase):
-    """"Test notice receiver support."""
-
-    # Test database needed: must be run as a DBTestSuite.
+    """"Test notification support."""
 
     def setUp(self):
         self.c = connect()
@@ -736,6 +755,40 @@
     def tearDown(self):
         self.c.close()
 
+    def testGetNotify(self):
+        getnotify = self.connection.getnotify
+        query = self.c.query
+        self.assertIsNone(getnotify())
+        query('listen test_notify')
+        try:
+            self.assertIsNone(self.c.getnotify())
+            query("notify test_notify")
+            r = getnotify()
+            self.assertIsInstance(r, tuple)
+            self.assertEqual(len(r), 3)
+            self.assertIsInstance(r[0], str)
+            self.assertIsInstance(r[1], int)
+            self.assertIsInstance(r[2], str)
+            self.assertEqual(r[0], 'test_notify')
+            self.assertEqual(r[2], '')
+            self.assertIsNone(self.c.getnotify())
+            try:
+                query("notify test_notify, 'test_payload'")
+            except pg.ProgrammingError:  # PostgreSQL < 9.0
+                pass
+            else:
+                r = getnotify()
+                self.assertTrue(isinstance(r, tuple))
+                self.assertEqual(len(r), 3)
+                self.assertIsInstance(r[0], str)
+                self.assertIsInstance(r[1], int)
+                self.assertIsInstance(r[2], str)
+                self.assertEqual(r[0], 'test_notify')
+                self.assertEqual(r[2], 'test_payload')
+                self.assertIsNone(getnotify())
+        finally:
+            query('unlisten test_notify')
+
     def testGetNoticeReceiver(self):
         self.assertIsNone(self.c.get_notice_receiver())
 
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to