Author: cito
Date: Wed Nov 25 07:09:06 2015
New Revision: 621

Log:
Notification handler dropped concurrent messages

Add fix and test contributed by Patrick TJ McPhee
as suggested on the mailing list.

Modified:
   trunk/module/TEST_PyGreSQL_classic.py
   trunk/module/pg.py

Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py       Tue Nov 24 15:59:27 2015        
(r620)
+++ trunk/module/TEST_PyGreSQL_classic.py       Wed Nov 25 07:09:06 2015        
(r621)
@@ -248,67 +248,89 @@
         else:
             self.notify_timeout = True
 
-    def test_notify(self):
+    def test_notify(self, options=None):
+        if not options:
+            options = {}
+        run_as_method = options.get('run_as_method')
+        call_notify = options.get('call_notify')
+        two_payloads = options.get('two_payloads')
+        db = opendb()
+        # Get function under test, can be standalone or DB method.
+        fut = db.notification_handler if run_as_method else partial(
+            NotificationHandler, db)
+        arg_dict = dict(event=None, called=False)
+        self.notify_timeout = False
+        # Listen for 'event_1'.
+        target = fut('event_1', self.notify_callback, arg_dict)
+        thread = Thread(None, target)
+        thread.start()
+        # Wait until the thread has started.
+        for n in range(500):
+            if target.listening:
+                break
+            sleep(0.01)
+        self.assertTrue(target.listening)
+        self.assertTrue(thread.isAlive())
+        # Open another connection for sending notifications.
+        db2 = opendb()
+        # Generate notification from the other connection.
+        if two_payloads:
+            db2.begin()
+        if call_notify:
+            if two_payloads:
+                target.notify(db2, payload='payload 0')
+            target.notify(db2, payload='payload 1')
+        else:
+            if two_payloads:
+                db2.query("notify event_1, 'payload 0'")
+            db2.query("notify event_1, 'payload 1'")
+        if two_payloads:
+            db2.commit()
+        # Wait until the notification has been caught.
+        for n in range(500):
+            if arg_dict['called'] or self.notify_timeout:
+                break
+            sleep(0.01)
+        # Check that callback has been invoked.
+        self.assertTrue(arg_dict['called'])
+        self.assertEqual(arg_dict['event'], 'event_1')
+        self.assertEqual(arg_dict['extra'], 'payload 1')
+        self.assertTrue(isinstance(arg_dict['pid'], int))
+        self.assertFalse(self.notify_timeout)
+        arg_dict['called'] = False
+        self.assertTrue(thread.isAlive())
+        # Generate stop notification.
+        if call_notify:
+            target.notify(db2, stop=True, payload='payload 2')
+        else:
+            db2.query("notify stop_event_1, 'payload 2'")
+        db2.close()
+        # Wait until the notification has been caught.
+        for n in range(500):
+            if arg_dict['called'] or self.notify_timeout:
+                break
+            sleep(0.01)
+        # Check that callback has been invoked.
+        self.assertTrue(arg_dict['called'])
+        self.assertEqual(arg_dict['event'], 'stop_event_1')
+        self.assertEqual(arg_dict['extra'], 'payload 2')
+        self.assertTrue(isinstance(arg_dict['pid'], int))
+        self.assertFalse(self.notify_timeout)
+        thread.join(5)
+        self.assertFalse(thread.isAlive())
+        self.assertFalse(target.listening)
+        target.close()
+
+    def test_notify_other_options(self):
         for run_as_method in False, True:
             for call_notify in False, True:
-                db = opendb()
-                # Get function under test, can be standalone or DB method.
-                fut = db.notification_handler if run_as_method else partial(
-                    NotificationHandler, db)
-                arg_dict = dict(event=None, called=False)
-                self.notify_timeout = False
-                # Listen for 'event_1'.
-                target = fut('event_1', self.notify_callback, arg_dict)
-                thread = Thread(None, target)
-                thread.start()
-                # Wait until the thread has started.
-                for n in range(500):
-                    if target.listening:
-                        break
-                    sleep(0.01)
-                self.assertTrue(target.listening)
-                self.assertTrue(thread.isAlive())
-                # Open another connection for sending notifications.
-                db2 = opendb()
-                # Generate notification from the other connection.
-                if call_notify:
-                    target.notify(db2, payload='payload 1')
-                else:
-                    db2.query("notify event_1, 'payload 1'")
-                # Wait until the notification has been caught.
-                for n in range(500):
-                    if arg_dict['called'] or self.notify_timeout:
-                        break
-                    sleep(0.01)
-                # Check that callback has been invoked.
-                self.assertTrue(arg_dict['called'])
-                self.assertEqual(arg_dict['event'], 'event_1')
-                self.assertEqual(arg_dict['extra'], 'payload 1')
-                self.assertTrue(isinstance(arg_dict['pid'], int))
-                self.assertFalse(self.notify_timeout)
-                arg_dict['called'] = False
-                self.assertTrue(thread.isAlive())
-                # Generate stop notification.
-                if call_notify:
-                    target.notify(db2, stop=True, payload='payload 2')
-                else:
-                    db2.query("notify stop_event_1, 'payload 2'")
-                db2.close()
-                # Wait until the notification has been caught.
-                for n in range(500):
-                    if arg_dict['called'] or self.notify_timeout:
-                        break
-                    sleep(0.01)
-                # Check that callback has been invoked.
-                self.assertTrue(arg_dict['called'])
-                self.assertEqual(arg_dict['event'], 'stop_event_1')
-                self.assertEqual(arg_dict['extra'], 'payload 2')
-                self.assertTrue(isinstance(arg_dict['pid'], int))
-                self.assertFalse(self.notify_timeout)
-                thread.join(5)
-                self.assertFalse(thread.isAlive())
-                self.assertFalse(target.listening)
-                target.close()
+                for two_payloads in False, True:
+                    options = dict(
+                        run_as_method=run_as_method,
+                        call_notify=call_notify,
+                        two_payloads=two_payloads)
+                    if any(options.values()):
+                        self.test_notify(options)
 
     def test_notify_timeout(self):
         for run_as_method in False, True:

Modified: trunk/module/pg.py
==============================================================================
--- trunk/module/pg.py  Tue Nov 24 15:59:27 2015        (r620)
+++ trunk/module/pg.py  Wed Nov 25 07:09:06 2015        (r621)
@@ -220,30 +220,28 @@
         self.listen()
         _ilist = [self.db.fileno()]
 
-        while True:
+        while self.listening:
             ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
-            if ilist == []:  # we timed out
-                self.unlisten()
-                self.callback(None)
-                break
-            else:
-                notice = self.db.getnotify()
-                if notice is None:
-                    continue
-                event, pid, extra = notice
-                if event in (self.event, self.stop_event):
+            if ilist:
+                while self.listening:
+                    notice = self.db.getnotify()
+                    if not notice:  # no more messages
+                        break
+                    event, pid, extra = notice
+                    if event not in (self.event, self.stop_event):
+                        self.unlisten()
+                        raise _db_error(
+                            'listening for "%s" and "%s", but notified of "%s"'
+                            % (self.event, self.stop_event, event))
+                    if event == self.stop_event:
+                        self.unlisten()
                     self.arg_dict['pid'] = pid
                     self.arg_dict['event'] = event
                     self.arg_dict['extra'] = extra
                     self.callback(self.arg_dict)
-                    if event == self.stop_event:
-                        self.unlisten()
-                        break
-                else:
-                    self.unlisten()
-                    raise _db_error(
-                        'listening for "%s" and "%s", but notified of "%s"'
-                        % (self.event, self.stop_event, event))
+            else:   # we timed out
+                self.unlisten()
+                self.callback(None)
 
 
 def pgnotify(*args, **kw):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to