Author: cito
Date: Wed Nov 25 07:09:06 2015
New Revision: 621
Log:
Notification handler dropped concurrent messages
Add fix and test contributed by Patrick TJ McPhee
as suggested on the mailing list.
Modified:
trunk/module/TEST_PyGreSQL_classic.py
trunk/module/pg.py
Modified: trunk/module/TEST_PyGreSQL_classic.py
==============================================================================
--- trunk/module/TEST_PyGreSQL_classic.py Tue Nov 24 15:59:27 2015
(r620)
+++ trunk/module/TEST_PyGreSQL_classic.py Wed Nov 25 07:09:06 2015
(r621)
@@ -248,67 +248,89 @@
else:
self.notify_timeout = True
- def test_notify(self):
+ def test_notify(self, options=None):
+ if not options:
+ options = {}
+ run_as_method = options.get('run_as_method')
+ call_notify = options.get('call_notify')
+ two_payloads = options.get('two_payloads')
+ db = opendb()
+ # Get function under test, can be standalone or DB method.
+ fut = db.notification_handler if run_as_method else partial(
+ NotificationHandler, db)
+ arg_dict = dict(event=None, called=False)
+ self.notify_timeout = False
+ # Listen for 'event_1'.
+ target = fut('event_1', self.notify_callback, arg_dict)
+ thread = Thread(None, target)
+ thread.start()
+ # Wait until the thread has started.
+ for n in range(500):
+ if target.listening:
+ break
+ sleep(0.01)
+ self.assertTrue(target.listening)
+ self.assertTrue(thread.isAlive())
+ # Open another connection for sending notifications.
+ db2 = opendb()
+ # Generate notification from the other connection.
+ if two_payloads:
+ db2.begin()
+ if call_notify:
+ if two_payloads:
+ target.notify(db2, payload='payload 0')
+ target.notify(db2, payload='payload 1')
+ else:
+ if two_payloads:
+ db2.query("notify event_1, 'payload 0'")
+ db2.query("notify event_1, 'payload 1'")
+ if two_payloads:
+ db2.commit()
+ # Wait until the notification has been caught.
+ for n in range(500):
+ if arg_dict['called'] or self.notify_timeout:
+ break
+ sleep(0.01)
+ # Check that callback has been invoked.
+ self.assertTrue(arg_dict['called'])
+ self.assertEqual(arg_dict['event'], 'event_1')
+ self.assertEqual(arg_dict['extra'], 'payload 1')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
+ self.assertFalse(self.notify_timeout)
+ arg_dict['called'] = False
+ self.assertTrue(thread.isAlive())
+ # Generate stop notification.
+ if call_notify:
+ target.notify(db2, stop=True, payload='payload 2')
+ else:
+ db2.query("notify stop_event_1, 'payload 2'")
+ db2.close()
+ # Wait until the notification has been caught.
+ for n in range(500):
+ if arg_dict['called'] or self.notify_timeout:
+ break
+ sleep(0.01)
+ # Check that callback has been invoked.
+ self.assertTrue(arg_dict['called'])
+ self.assertEqual(arg_dict['event'], 'stop_event_1')
+ self.assertEqual(arg_dict['extra'], 'payload 2')
+ self.assertTrue(isinstance(arg_dict['pid'], int))
+ self.assertFalse(self.notify_timeout)
+ thread.join(5)
+ self.assertFalse(thread.isAlive())
+ self.assertFalse(target.listening)
+ target.close()
+
+ def test_notify_other_options(self):
for run_as_method in False, True:
for call_notify in False, True:
- db = opendb()
- # Get function under test, can be standalone or DB method.
- fut = db.notification_handler if run_as_method else partial(
- NotificationHandler, db)
- arg_dict = dict(event=None, called=False)
- self.notify_timeout = False
- # Listen for 'event_1'.
- target = fut('event_1', self.notify_callback, arg_dict)
- thread = Thread(None, target)
- thread.start()
- # Wait until the thread has started.
- for n in range(500):
- if target.listening:
- break
- sleep(0.01)
- self.assertTrue(target.listening)
- self.assertTrue(thread.isAlive())
- # Open another connection for sending notifications.
- db2 = opendb()
- # Generate notification from the other connection.
- if call_notify:
- target.notify(db2, payload='payload 1')
- else:
- db2.query("notify event_1, 'payload 1'")
- # Wait until the notification has been caught.
- for n in range(500):
- if arg_dict['called'] or self.notify_timeout:
- break
- sleep(0.01)
- # Check that callback has been invoked.
- self.assertTrue(arg_dict['called'])
- self.assertEqual(arg_dict['event'], 'event_1')
- self.assertEqual(arg_dict['extra'], 'payload 1')
- self.assertTrue(isinstance(arg_dict['pid'], int))
- self.assertFalse(self.notify_timeout)
- arg_dict['called'] = False
- self.assertTrue(thread.isAlive())
- # Generate stop notification.
- if call_notify:
- target.notify(db2, stop=True, payload='payload 2')
- else:
- db2.query("notify stop_event_1, 'payload 2'")
- db2.close()
- # Wait until the notification has been caught.
- for n in range(500):
- if arg_dict['called'] or self.notify_timeout:
- break
- sleep(0.01)
- # Check that callback has been invoked.
- self.assertTrue(arg_dict['called'])
- self.assertEqual(arg_dict['event'], 'stop_event_1')
- self.assertEqual(arg_dict['extra'], 'payload 2')
- self.assertTrue(isinstance(arg_dict['pid'], int))
- self.assertFalse(self.notify_timeout)
- thread.join(5)
- self.assertFalse(thread.isAlive())
- self.assertFalse(target.listening)
- target.close()
+ for two_payloads in False, True:
+ options = dict(
+ run_as_method=run_as_method,
+ call_notify=call_notify,
+ two_payloads=two_payloads)
+ if any(options.values()):
+ self.test_notify(options)
def test_notify_timeout(self):
for run_as_method in False, True:
Modified: trunk/module/pg.py
==============================================================================
--- trunk/module/pg.py Tue Nov 24 15:59:27 2015 (r620)
+++ trunk/module/pg.py Wed Nov 25 07:09:06 2015 (r621)
@@ -220,30 +220,28 @@
self.listen()
_ilist = [self.db.fileno()]
- while True:
+ while self.listening:
ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
- if ilist == []: # we timed out
- self.unlisten()
- self.callback(None)
- break
- else:
- notice = self.db.getnotify()
- if notice is None:
- continue
- event, pid, extra = notice
- if event in (self.event, self.stop_event):
+ if ilist:
+ while self.listening:
+ notice = self.db.getnotify()
+ if not notice: # no more messages
+ break
+ event, pid, extra = notice
+ if event not in (self.event, self.stop_event):
+ self.unlisten()
+ raise _db_error(
+ 'listening for "%s" and "%s", but notified of "%s"'
+ % (self.event, self.stop_event, event))
+ if event == self.stop_event:
+ self.unlisten()
self.arg_dict['pid'] = pid
self.arg_dict['event'] = event
self.arg_dict['extra'] = extra
self.callback(self.arg_dict)
- if event == self.stop_event:
- self.unlisten()
- break
- else:
- self.unlisten()
- raise _db_error(
- 'listening for "%s" and "%s", but notified of "%s"'
- % (self.event, self.stop_event, event))
+ else: # we timed out
+ self.unlisten()
+ self.callback(None)
def pgnotify(*args, **kw):
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql