Author: cito
Date: Wed Nov 25 08:40:18 2015
New Revision: 625

Log:
Backport notification handler fix to 4.x

The notificaton handler had been fixed in trunk so that it
will not drop concurrent messages any more.

Modified:
   branches/4.x/module/TEST_PyGreSQL_classic.py
   branches/4.x/module/pg.py

Modified: branches/4.x/module/TEST_PyGreSQL_classic.py
==============================================================================
--- branches/4.x/module/TEST_PyGreSQL_classic.py        Wed Nov 25 08:37:33 
2015        (r624)
+++ branches/4.x/module/TEST_PyGreSQL_classic.py        Wed Nov 25 08:40:18 
2015        (r625)
@@ -234,67 +234,93 @@
         else:
             self.notify_timeout = True
 
-    def test_notify(self):
+    def test_notify(self, options=None):
+        if not options:
+            options = {}
+        run_as_method = options.get('run_as_method')
+        call_notify = options.get('call_notify')
+        two_payloads = options.get('two_payloads')
+        db = opendb()
+        # Get function under test, can be standalone or DB method.
+        fut = db.notification_handler if run_as_method else partial(
+            NotificationHandler, db)
+        arg_dict = dict(event=None, called=False)
+        self.notify_timeout = False
+        # Listen for 'event_1'.
+        target = fut('event_1', self.notify_callback, arg_dict, 5)
+        thread = Thread(None, target)
+        thread.start()
+        try:
+            # Wait until the thread has started.
+            for n in range(500):
+                if target.listening:
+                    break
+                sleep(0.01)
+            self.assertTrue(target.listening)
+            self.assertTrue(thread.isAlive())
+            # Open another connection for sending notifications.
+            db2 = opendb()
+            # Generate notification from the other connection.
+            if two_payloads:
+                db2.begin()
+            if call_notify:
+                if two_payloads:
+                    target.notify(db2, payload='payload 0')
+                target.notify(db2, payload='payload 1')
+            else:
+                if two_payloads:
+                    db2.query("notify event_1, 'payload 0'")
+                db2.query("notify event_1, 'payload 1'")
+            if two_payloads:
+                db2.commit()
+            # Wait until the notification has been caught.
+            for n in range(500):
+                if arg_dict['called'] or self.notify_timeout:
+                    break
+                sleep(0.01)
+            # Check that callback has been invoked.
+            self.assertTrue(arg_dict['called'])
+            self.assertEqual(arg_dict['event'], 'event_1')
+            self.assertEqual(arg_dict['extra'], 'payload 1')
+            self.assertTrue(isinstance(arg_dict['pid'], int))
+            self.assertFalse(self.notify_timeout)
+            arg_dict['called'] = False
+            self.assertTrue(thread.isAlive())
+            # Generate stop notification.
+            if call_notify:
+                target.notify(db2, stop=True, payload='payload 2')
+            else:
+                db2.query("notify stop_event_1, 'payload 2'")
+            db2.close()
+            # Wait until the notification has been caught.
+            for n in range(500):
+                if arg_dict['called'] or self.notify_timeout:
+                    break
+                sleep(0.01)
+            # Check that callback has been invoked.
+            self.assertTrue(arg_dict['called'])
+            self.assertEqual(arg_dict['event'], 'stop_event_1')
+            self.assertEqual(arg_dict['extra'], 'payload 2')
+            self.assertTrue(isinstance(arg_dict['pid'], int))
+            self.assertFalse(self.notify_timeout)
+            thread.join(5)
+            self.assertFalse(thread.isAlive())
+            self.assertFalse(target.listening)
+        finally:
+            target.close()
+            if thread.is_alive():
+                thread.join(5)
+
+    def test_notify_other_options(self):
         for run_as_method in False, True:
             for call_notify in False, True:
-                db = opendb()
-                # Get function under test, can be standalone or DB method.
-                fut = db.notification_handler if run_as_method else partial(
-                    NotificationHandler, db)
-                arg_dict = dict(event=None, called=False)
-                self.notify_timeout = False
-                # Listen for 'event_1'.
-                target = fut('event_1', self.notify_callback, arg_dict)
-                thread = Thread(None, target)
-                thread.start()
-                # Wait until the thread has started.
-                for n in xrange(500):
-                    if target.listening:
-                        break
-                    sleep(0.01)
-                self.assertTrue(target.listening)
-                self.assertTrue(thread.isAlive())
-                # Open another connection for sending notifications.
-                db2 = opendb()
-                # Generate notification from the other connection.
-                if call_notify:
-                    target.notify(db2, payload='payload 1')
-                else:
-                    db2.query("notify event_1, 'payload 1'")
-                # Wait until the notification has been caught.
-                for n in xrange(500):
-                    if arg_dict['called'] or self.notify_timeout:
-                        break
-                    sleep(0.01)
-                # Check that callback has been invoked.
-                self.assertTrue(arg_dict['called'])
-                self.assertEqual(arg_dict['event'], 'event_1')
-                self.assertEqual(arg_dict['extra'], 'payload 1')
-                self.assertTrue(isinstance(arg_dict['pid'], int))
-                self.assertFalse(self.notify_timeout)
-                arg_dict['called'] = False
-                self.assertTrue(thread.isAlive())
-                # Generate stop notification.
-                if call_notify:
-                    target.notify(db2, stop=True, payload='payload 2')
-                else:
-                    db2.query("notify stop_event_1, 'payload 2'")
-                db2.close()
-                # Wait until the notification has been caught.
-                for n in xrange(500):
-                    if arg_dict['called'] or self.notify_timeout:
-                        break
-                    sleep(0.01)
-                # Check that callback has been invoked.
-                self.assertTrue(arg_dict['called'])
-                self.assertEqual(arg_dict['event'], 'stop_event_1')
-                self.assertEqual(arg_dict['extra'], 'payload 2')
-                self.assertTrue(isinstance(arg_dict['pid'], int))
-                self.assertFalse(self.notify_timeout)
-                thread.join(5)
-                self.assertFalse(thread.isAlive())
-                self.assertFalse(target.listening)
-                target.close()
+                for two_payloads in False, True:
+                    options = dict(
+                        run_as_method=run_as_method,
+                        call_notify=call_notify,
+                        two_payloads=two_payloads)
+                    if True in options.values():
+                        self.test_notify(options)
 
     def test_notify_timeout(self):
         for run_as_method in False, True:

Modified: branches/4.x/module/pg.py
==============================================================================
--- branches/4.x/module/pg.py   Wed Nov 25 08:37:33 2015        (r624)
+++ branches/4.x/module/pg.py   Wed Nov 25 08:40:18 2015        (r625)
@@ -225,30 +225,28 @@
         self.listen()
         _ilist = [self.db.fileno()]
 
-        while True:
+        while self.listening:
             ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
-            if ilist == []:  # we timed out
-                self.unlisten()
-                self.callback(None)
-                break
-            else:
-                notice = self.db.getnotify()
-                if notice is None:
-                    continue
-                event, pid, extra = notice
-                if event in (self.event, self.stop_event):
+            if ilist:
+                while self.listening:
+                    notice = self.db.getnotify()
+                    if not notice:  # no more messages
+                        break
+                    event, pid, extra = notice
+                    if event not in (self.event, self.stop_event):
+                        self.unlisten()
+                        raise _db_error(
+                            'listening for "%s" and "%s", but notified of "%s"'
+                            % (self.event, self.stop_event, event))
+                    if event == self.stop_event:
+                        self.unlisten()
                     self.arg_dict['pid'] = pid
                     self.arg_dict['event'] = event
                     self.arg_dict['extra'] = extra
                     self.callback(self.arg_dict)
-                    if event == self.stop_event:
-                        self.unlisten()
-                        break
-                else:
-                    self.unlisten()
-                    raise _db_error(
-                        'listening for "%s" and "%s", but notified of "%s"'
-                        % (self.event, self.stop_event, event))
+            else:   # we timed out
+                self.unlisten()
+                self.callback(None)
 
 
 def pgnotify(*args, **kw):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to