Hello community,

here is the log from the commit of package python-moksha-hub for 
openSUSE:Factory checked in at 2019-05-22 15:41:16
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-moksha-hub (Old)
 and      /work/SRC/openSUSE:Factory/.python-moksha-hub.new.5148 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-moksha-hub"

Wed May 22 15:41:16 2019 rev:3 rq:704706 version:1.5.17

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-moksha-hub/python-moksha-hub.changes      
2018-12-24 11:39:52.801532605 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-moksha-hub.new.5148/python-moksha-hub.changes
    2019-05-22 15:42:04.978427044 +0200
@@ -1,0 +2,7 @@
+Wed May 22 05:37:00 UTC 2019 - [email protected]
+
+- version update to 1.5.17
+  * no upstream changelog
+- run tests
+
+-------------------------------------------------------------------

Old:
----
  moksha.hub-1.5.3.tar.gz

New:
----
  moksha.hub-1.5.17.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-moksha-hub.spec ++++++
--- /var/tmp/diff_new_pack.RVlACr/_old  2019-05-22 15:42:06.646427036 +0200
+++ /var/tmp/diff_new_pack.RVlACr/_new  2019-05-22 15:42:06.650427036 +0200
@@ -1,7 +1,7 @@
 #
 # spec file for package python-moksha-hub
 #
-# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany.
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -17,16 +17,23 @@
 
 
 %{?!python_module:%define python_module() python-%{**} python3-%{**}}
-%bcond_with test
 Name:           python-moksha-hub
-Version:        1.5.3
+Version:        1.5.17
 Release:        0
 Summary:        Hub components for Moksha
 License:        Apache-2.0
 Group:          Development/Languages/Python
 URL:            https://mokshaproject.net
 Source:         
https://files.pythonhosted.org/packages/source/m/moksha.hub/moksha.hub-%{version}.tar.gz
+BuildRequires:  %{python_module Twisted}
+BuildRequires:  %{python_module mock}
+BuildRequires:  %{python_module moksha-common >= 1.0.6}
+BuildRequires:  %{python_module nose}
+BuildRequires:  %{python_module pyzmq}
 BuildRequires:  %{python_module setuptools}
+BuildRequires:  %{python_module txWS}
+BuildRequires:  %{python_module txZMQ}
+BuildRequires:  %{python_module websocket-client}
 BuildRequires:  fdupes
 BuildRequires:  python-rpm-macros
 Requires:       python-Twisted
@@ -35,16 +42,6 @@
 Requires:       python-txWS
 Requires:       python-txZMQ
 BuildArch:      noarch
-%if %{with test}
-BuildRequires:  %{python_module Twisted}
-BuildRequires:  %{python_module mock}
-BuildRequires:  %{python_module moksha-common >= 1.0.6}
-BuildRequires:  %{python_module nose}
-BuildRequires:  %{python_module pyzmq}
-BuildRequires:  %{python_module txWS}
-BuildRequires:  %{python_module txZMQ}
-BuildRequires:  %{python_module websocket-client}
-%endif
 %python_subpackages
 
 %description
@@ -59,10 +56,9 @@
 %install
 %python_install
 %python_expand %fdupes %{buildroot}%{$python_sitelib}
-%if %{with test}
+
 %check
 %python_exec setup.py test
-%endif
 
 %files %{python_files}
 %license COPYING

++++++ moksha.hub-1.5.3.tar.gz -> moksha.hub-1.5.17.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/PKG-INFO 
new/moksha.hub-1.5.17/PKG-INFO
--- old/moksha.hub-1.5.3/PKG-INFO       2017-07-11 17:08:34.000000000 +0200
+++ new/moksha.hub-1.5.17/PKG-INFO      2019-02-12 06:20:18.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: moksha.hub
-Version: 1.5.3
+Version: 1.5.17
 Summary: Hub components for Moksha.
 Home-page: https://mokshaproject.net
 Author: Luke Macken, John (J5) Palmieri, Mairin Duffy, and Ralph Bean
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/api/consumer.py 
new/moksha.hub-1.5.17/moksha/hub/api/consumer.py
--- old/moksha.hub-1.5.3/moksha/hub/api/consumer.py     2017-07-11 
17:08:16.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha/hub/api/consumer.py    2018-09-24 
18:14:16.000000000 +0200
@@ -33,6 +33,7 @@
 log = logging.getLogger('moksha.hub')
 
 import six.moves.queue as queue
+from collections import deque
 
 from kitchen.iterutils import iterate
 from moksha.common.lib.helpers import create_app_engine
@@ -60,7 +61,7 @@
         # the queue to do "consume" work.
         self.incoming = queue.Queue()
         self.headcount_in = self.headcount_out = 0
-        self._times = []
+        self._times = deque(maxlen=1024)
 
         callback = self._consume
         if self.jsonify:
@@ -99,7 +100,7 @@
             backlog = self.incoming.qsize()
             headcount_out = self.headcount_out
             headcount_in = self.headcount_in
-            times = self._times
+            times = list(self._times)
         else:
             backlog = None
             headcount_out = headcount_in = 0
@@ -120,7 +121,7 @@
         # Reset these counters before returning.
         self.headcount_out = self.headcount_in = 0
         self._exception_count = 0
-        self._times = []
+        self._times.clear()
         return results
 
     def debug(self, message):
@@ -218,7 +219,7 @@
         # Record how long it took to process this message (for stats)
         self._times.append(time.time() - start)
 
-        self.debug("Going back to waiting on the incoming queue.")
+        self.debug("Going back to waiting on the incoming queue.  Message 
handled: %r" % handled)
         return handled
 
     def validate(self, message):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/api/producer.py 
new/moksha.hub-1.5.17/moksha/hub/api/producer.py
--- old/moksha.hub-1.5.3/moksha/hub/api/producer.py     2014-10-20 
18:51:21.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha/hub/api/producer.py    2019-01-10 
19:15:08.000000000 +0100
@@ -92,6 +92,10 @@
                 (self.frequency.microseconds / 1000000.0)
 
         self._last_ran = None
+        # This is used to determine if the configured frequency has been meet,
+        # and the poller should run. This allows the poller to check if
+        # self.die is True more frequently.
+        self._until_next_poll = self.frequency
 
         log.debug("Setting a %s second timer" % self.frequency)
         moksha.hub.reactor.reactor.callInThread(self._work)
@@ -125,8 +129,27 @@
             self._poll()
 
         while not self.die:
-            time.sleep(self.frequency)
-            self._poll()
+            if not self.frequency:
+                # If no frequency is set, just continuously poll
+                self._poll()
+                continue
+
+            # If _until_next_poll is less than or equal to 0, that means
+            # the frequency has been met and the poller needs to run again
+            if self._until_next_poll <= 0:
+                self._poll()
+                # Reset _until_next_poll so that polling doesn't happen
+                # until the frequency is met again
+                self._until_next_poll = self.frequency
+            else:
+                # Only sleep 5 seconds (or _until_next_poll if it's shorter) at
+                # a time since we want to check if the poller should die
+                # frequently. Otherwise, you'll end up with Moksha Hub in a
+                # state where the hub is stopped but the poller is still
+                # sleeping until the frequency is met
+                sleep_time = min((5, self._until_next_poll))
+                time.sleep(sleep_time)
+                self._until_next_poll -= sleep_time
 
     def stop(self):
         super(PollingProducer, self).stop()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/hub.py 
new/moksha.hub-1.5.17/moksha/hub/hub.py
--- old/moksha.hub-1.5.3/moksha/hub/hub.py      2017-06-26 18:02:14.000000000 
+0200
+++ new/moksha.hub-1.5.17/moksha/hub/hub.py     2018-09-24 18:14:16.000000000 
+0200
@@ -223,9 +223,12 @@
 
         # FIXME: only do this if the consumer wants it `jsonified`
         try:
-            body = JSON.loads(message['body'])
+            if message['body']:
+                body = JSON.loads(message['body'])
+            else:
+                body = {}
         except Exception as e:
-            log.warning('Cannot decode message from JSON: %s' % e)
+            log.warning('Cannot decode body from JSON: %s -> %r' % (e, 
message))
             #body = {}
             body = message['body']
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/monitoring.py 
new/moksha.hub-1.5.17/moksha/hub/monitoring.py
--- old/moksha.hub-1.5.3/moksha/hub/monitoring.py       2014-10-20 
18:51:21.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha/hub/monitoring.py      2018-09-24 
18:14:16.000000000 +0200
@@ -51,7 +51,7 @@
         # to us.
         mode = hub.config.get('moksha.monitoring.socket.mode')
         if endpoint.startswith("ipc://") and mode:
-            mode = string.atoi(mode, base=8)
+            mode = int(mode, base=8)
             path = endpoint.split("ipc://")[-1]
             os.chmod(path, mode)
 
@@ -72,7 +72,7 @@
             "producers": self.serialize(self.hub.producers),
         }
         if self.socket:
-            self.socket.send(json.dumps(data))
+            self.socket.send_string(json.dumps(data))
 
     def stop(self):
         super(MonitoringProducer, self).stop()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/stomp/protocol.py 
new/moksha.hub-1.5.17/moksha/hub/stomp/protocol.py
--- old/moksha.hub-1.5.3/moksha/hub/stomp/protocol.py   2017-06-26 
17:56:51.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha/hub/stomp/protocol.py  2019-02-12 
06:19:48.000000000 +0100
@@ -21,6 +21,10 @@
 
 import logging
 
+from distutils.version import LooseVersion
+
+from moksha.common.lib.converters import asbool
+from moksha.hub.reactor import reactor
 
 try:
     # stomper is not ready for py3
@@ -78,7 +82,7 @@
         f.headers.update(headers)
         cmd = f.pack()
         log.debug(cmd)
-        self.transport.write(cmd)
+        self.transport.write(cmd.encode('utf-8'))
 
     def connectionMade(self):
         """ Register with stomp server """
@@ -91,8 +95,14 @@
         else:
             cmd = stomper.connect(self.username, self.password)
         log.debug(cmd)
-        self.transport.write(cmd)
+        self.transport.write(cmd.encode('utf-8'))
 
+    def error(self, msg):
+        """ Extend stomper's own error method to kill the hub. """
+        super(StompProtocol, self).error(msg)
+        log.error("Requesting shutdown of hub for STOMP error.")
+        reactor.callLater(0, self.client.hub.close)
+        reactor.callLater(0, reactor.stop)
 
     def ack(self, msg):
         """ Override stomper's own ack to be smarter, based on mode. """
@@ -108,7 +118,7 @@
 
     def dataReceived(self, data):
         """Data received, react to it and respond if needed """
-        self.buffer.appendData(data)
+        self.buffer.appendData(data.decode('utf-8', errors='replace'))
         while True:
            msg = self.buffer.getOneMessage()
            if msg is None:
@@ -127,7 +137,15 @@
            # Otherwise, see if we need to turn a naive 'ack' from stomper into
            # a 'nack' if our consumers failed to do their jobs.
            if handled is False and response.startswith("ACK\n"):
-               if stomper.STOMP_VERSION != '1.1':
+
+               send_nacks = 
asbool(self.client.hub.config.get('stomp_send_explicit_nacks', True))
+               if not send_nacks:
+                   log.warn("Message handling failed.  
stomp_send_explicit_nacks=%r.  "
+                            "Sending no reply to the broker.", send_nacks)
+                   # Return, so as not to send an erroneous ack.
+                   return
+
+               if LooseVersion(stomper.STOMP_VERSION) < LooseVersion('1.1'):
                    log.error("Unable to NACK stomp %r" % stomper.STOMP_VERSION)
                    # Also, not sending an erroneous ack.
                    return
@@ -135,8 +153,11 @@
                message_id = msg['headers']['message-id']
                subscription = msg['headers']['subscription']
                transaction_id = msg['headers'].get('transaction-id')
-               response = stomper.stomp_11.nack(message_id, subscription, 
transaction_id)
+               response = stomper.nack(message_id, subscription, 
transaction_id)
 
            # Finally, send our response (ACK or NACK) back to the broker.
-           log.debug(response)
-           self.transport.write(response)
+           if not handled:
+               log.warn("handled=%r.  Responding with %s" % (handled, 
response))
+           else:
+               log.debug("handled=%r.  Responding with %s" % (handled, 
response))
+           self.transport.write(response.encode('utf-8'))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/stomp/stomp.py 
new/moksha.hub-1.5.17/moksha/hub/stomp/stomp.py
--- old/moksha.hub-1.5.3/moksha/hub/stomp/stomp.py      2016-11-10 
15:43:34.000000000 +0100
+++ new/moksha.hub-1.5.17/moksha/hub/stomp/stomp.py     2019-02-06 
23:08:37.000000000 +0100
@@ -28,6 +28,7 @@
 
 import logging
 
+import six
 from twisted.internet.protocol import ClientFactory
 
 from moksha.hub.stomp.protocol import StompProtocol
@@ -46,7 +47,7 @@
     def __init__(self, hub, config):
         self.config = config
         self.hub = hub
-        self._topics = hub.topics.keys()
+        self._topics = list(hub.topics.keys())
         self._frames = []
 
         uri = self.config.get('stomp_uri', None)
@@ -55,16 +56,6 @@
             host = self.config.get('stomp_broker')
             uri = "%s:%i" % (host, port)
 
-        # Sometimes, a stomp consumer may wish to be subscribed to a queue
-        # which is composed of messages from many different topics.  In this
-        # case, the hub hands dispatching messages to the right consumers.
-        # This extension is only concerned with the queue, and negotiating that
-        # with the broker.
-        stomp_queue = self.config.get('stomp_queue', None)
-        if stomp_queue:
-            # Overwrite the declarations of all of our consumers.
-            self._topics = [stomp_queue]
-
         # A list of addresses over which we emulate failover()
         self.addresses = [pair.split(":") for pair in uri.split(',')]
         self.address_index = 0
@@ -96,7 +87,15 @@
                     client_cert = ssl.PrivateCertificate.loadPEM(
                         key_file.read() + cert_file.read())
 
-            ssl_context = client_cert.options()
+            try:
+                import service_identity
+                # connect SSL/TLS with SNI support 
(https://twistedmatrix.com/trac/ticket/5190)
+                # This requires service_identity module: 
https://pypi.python.org/pypi/service_identity
+                ssl_context = ssl.optionsForClientTLS(six.text_type(host), 
clientCertificate=client_cert)
+            except ImportError:
+                log.warn("Connecting without SNI support due to absence of 
service_identity module.")
+                ssl_context = client_cert.options()
+
             reactor.connectSSL(host, int(port), self, ssl_context)
         else:
             log.info("connecting unencrypted to %r %r %r" % (
@@ -114,17 +113,35 @@
             interval = max(self.client_heartbeat, server_heartbeat)
             log.debug("Heartbeat of %ims negotiated from (%i,%i); starting." % 
(
                 interval, self.client_heartbeat, server_heartbeat))
-            self.start_heartbeat(interval)
+            # According to STOMP documentation, we have negotiated a heartbeat
+            # of `interval` milliseconds and heartbeats must be sent *at least*
+            # that often.  Here, we'll send them twice as often to give plenty
+            # of room for latency.
+            # 
https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
+            fudge_factor = 0.5
+            self.start_heartbeat(interval=(interval * fudge_factor))
         else:
             log.debug("Skipping heartbeat initialization")
 
+        # Sometimes, a stomp consumer may wish to be subscribed to a queue
+        # which is composed of messages from many different topics.  In this
+        # case, the hub hands dispatching messages to the right consumers.
+        # This extension is only concerned with the queue, and negotiating that
+        # with the broker.
+        stomp_queue = self.config.get('stomp_queue', None)
+        if stomp_queue and self._topics and self._topics != [stomp_queue]:
+            log.info('Discarding consumer-specified topics in favor of '
+                     'stomp_queue=%s: %r' % (stomp_queue, self._topics))
+            # Overwrite the declarations of all of our consumers.
+            self._topics = [stomp_queue]
+
         for topic in self._topics:
             log.info('Subscribing to %s topic' % topic)
             self.subscribe(topic, callback=lambda msg: None)
 
         for frame in self._frames:
             log.debug('Flushing queued frame')
-            self.proto.transport.write(frame.pack())
+            self.proto.transport.write(frame.pack().encode('utf-8'))
         self._frames = []
 
     def clientConnectionLost(self, connector, reason):
@@ -140,7 +157,7 @@
     def failover(self):
         self.address_index = (self.address_index + 1) % len(self.addresses)
         args = (self.addresses[self.address_index], self.key, self.crt,)
-        self._delay = self._delay * (1 + (2.0 / len(self.addresses)))
+        self._delay = min(60.0, self._delay * (1 + (2.0 / 
len(self.addresses))))
         log.info('(failover) reconnecting in %f seconds.' % self._delay)
         reactor.callLater(self._delay, self.connect, *args)
 
@@ -150,7 +167,7 @@
 
     def heartbeat(self, interval):
         if self._heartbeat_enabled:
-            self.proto.transport.write(chr(0x0A))  # Lub-dub
+            self.proto.transport.write(chr(0x0A).encode('utf-8'))  # Lub-dub
             reactor.callLater(interval / 1000.0, self.heartbeat, interval)
         else:
             log.debug("(heartbeat stopped)")
@@ -160,13 +177,16 @@
         self._heartbeat_enabled = False
 
     def send_message(self, topic, message, **headers):
+        # Convert any utf-8 encoded payloads back to unicode.  stomper can't 
handle bytestrings
+        topic = topic.decode('utf-8') if isinstance(topic, six.binary_type) 
else topic
+        message = message.decode('utf-8') if isinstance(message, 
six.binary_type) else message
         f = stomper.Frame()
         f.unpack(stomper.send(topic, message))
         if not self.proto:
             log.info("Queueing stomp frame for later delivery")
             self._frames.append(f)
         else:
-            self.proto.transport.write(f.pack())
+            self.proto.transport.write(f.pack().encode('utf-8'))
 
         super(StompHubExtension, self).send_message(topic, message, **headers)
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha/hub/tests/test_hub.py 
new/moksha.hub-1.5.17/moksha/hub/tests/test_hub.py
--- old/moksha.hub-1.5.3/moksha/hub/tests/test_hub.py   2014-10-22 
15:57:40.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha/hub/tests/test_hub.py  2018-09-24 
18:14:16.000000000 +0200
@@ -26,12 +26,19 @@
 from time import sleep, time
 from uuid import uuid4
 from kitchen.iterutils import iterate
+import tempfile
+import shutil
+import os
+import stat
+import zmq
+import json
 
 import moksha.common.testtools.utils as testutils
 
 import moksha.hub.api
 from moksha.hub.hub import MokshaHub, CentralMokshaHub
 from moksha.hub.reactor import reactor as _reactor
+from moksha.hub.monitoring import MonitoringProducer
 from nose.tools import (eq_, assert_true, assert_false)
 
 
@@ -49,7 +56,7 @@
         _reactor.runUntilCurrent()
 
 
-class TestHub(unittest.TestCase):
+class TestHub:
 
     def _setUp(self):
         def kernel(config):
@@ -124,13 +131,15 @@
         I'm not sure how to do that, so we're going to fake it and manually
         add this consumer to the list of consumers of which the Hub is aware.
         """
-        consume = cons(self.hub).consume
+        consumer = cons(self.hub)
+        consume = consumer.consume
         for topic in iterate(cons.topic):
             self.hub.topics[topic] = self.hub.topics.get(topic, [])
             if consume not in self.hub.topics[topic]:
                 print('registering fake topic %r' % topic)
                 self.hub.topics[topic].append(consume)
         sleep(sleep_duration)
+        return consumer
 
     @testutils.crosstest
     def test_abstract(self):
@@ -388,6 +397,102 @@
         central = CentralMokshaHub(config, [TestConsumer], [])
         central.close()
 
+    @testutils.crosstest
+    def test_consumer_stats_queued(self):
+        """ Verify that message processing stats are set for queued messages. 
"""
+
+        class TestConsumer(moksha.hub.api.consumer.Consumer):
+            topic = self.a_topic
+
+            def consume(self, message):
+                pass
+
+        cons = self.fake_register_consumer(TestConsumer)
+
+        for i in range(5):
+            self.hub.send_message(topic=self.a_topic, message=secret)
+
+        simulate_reactor(sleep_duration)
+        sleep(sleep_duration)
+
+        eq_(cons.headcount_in, 5)
+        eq_(cons.headcount_out, 0)
+        eq_(cons._exception_count, 0)
+        eq_(len(cons._times), 0)
+
+    @testutils.crosstest
+    def test_consumer_stats_processed(self):
+        """ Verify that message processing stats are set for processed 
messages. """
+
+        class TestConsumer(moksha.hub.api.consumer.Consumer):
+            topic = self.a_topic
+
+            def consume(self, message):
+                pass
+
+        self.hub.config['moksha.blocking_mode'] = True
+        cons = self.fake_register_consumer(TestConsumer)
+
+        for i in range(5):
+            self.hub.send_message(topic=self.a_topic, message=secret)
+
+        simulate_reactor(sleep_duration)
+        sleep(sleep_duration)
+
+        eq_(cons.headcount_in, 5)
+        eq_(cons.headcount_out, 5)
+        eq_(cons._exception_count, 0)
+        eq_(len(cons._times), 5)
+
+    @testutils.crosstest
+    def test_consumer_stats_exceptions(self):
+        """ Verify that message processing stats are set for messages that 
generate exceptions. """
+
+        class TestConsumer(moksha.hub.api.consumer.Consumer):
+            topic = self.a_topic
+
+            def consume(self, message):
+                if message['body'] % 2:
+                    raise RuntimeError()
+
+        self.hub.config['moksha.blocking_mode'] = True
+        cons = self.fake_register_consumer(TestConsumer)
+
+        for i in range(5):
+            self.hub.send_message(topic=self.a_topic, message=i)
+
+        simulate_reactor(sleep_duration)
+        sleep(sleep_duration)
+
+        eq_(cons.headcount_in, 5)
+        eq_(cons.headcount_out, 5)
+        eq_(cons._exception_count, 2)
+        eq_(len(cons._times), 5)
+
+    @testutils.crosstest
+    def test_consumer_stats_overflow(self):
+        """ Verify that Consumer._times doesn't grow beyond a maximum size. """
+
+        class TestConsumer(moksha.hub.api.consumer.Consumer):
+            topic = self.a_topic
+
+            def consume(self, message):
+                pass
+
+        self.hub.config['moksha.blocking_mode'] = True
+        cons = self.fake_register_consumer(TestConsumer)
+
+        for i in range(1500):
+            self.hub.send_message(topic=self.a_topic, message=secret)
+
+        simulate_reactor(sleep_duration)
+        sleep(sleep_duration)
+
+        eq_(cons.headcount_in, 1500)
+        eq_(cons.headcount_out, 1500)
+        eq_(cons._exception_count, 0)
+        eq_(len(cons._times), 1024)
+
 
 class TestProducer:
     def _setUp(self):
@@ -443,3 +548,49 @@
     def test_idempotence(self):
         """ Test that running the same test twice still works. """
         return self.test_produce_ten_strs()
+
+
+class TestMonitoring:
+    def _setUp(self):
+        def kernel(config):
+            self.hub = CentralMokshaHub(config=config)
+            self.a_topic = a_topic = str(uuid4())
+
+        for __setup, name in testutils.make_setup_functions(kernel):
+            yield __setup, name
+
+    def _tearDown(self):
+        self.hub.close()
+
+    @testutils.crosstest
+    def test_monitoring(self):
+        """ Test that the MonitoringProducer works as expected. """
+        tmpdir = tempfile.mkdtemp()
+        try:
+            zmq_file = tmpdir + '/socket'
+            zmq_socket = 'ipc://' + zmq_file
+            self.hub.config['moksha.monitoring.socket'] = zmq_socket
+            self.hub.config['moksha.monitoring.socket.mode'] = '777'
+            mon = MonitoringProducer(self.hub)
+            assert_true(os.path.exists(zmq_file))
+            assert_true(stat.S_IMODE(os.stat(zmq_file).st_mode) == 0o777)
+            ctx = zmq.Context()
+            sub = ctx.socket(zmq.SUB)
+            sub.setsockopt(zmq.RCVTIMEO, 10000)
+            sub.setsockopt_string(zmq.SUBSCRIBE, u'')
+            sub.connect(zmq_socket)
+            data = []
+            def recv():
+                data.append(sub.recv())
+            thread = threading.Thread(target=recv)
+            thread.start()
+            sleep(sleep_duration)
+            mon.poll()
+            thread.join()
+            eq_(len(data), 1)
+            d = json.loads(data[0])
+            eq_(len(d['consumers']), 0)
+            eq_(len(d['producers']), 1)
+            eq_(d['producers'][0]['name'], 'MonitoringProducer')
+        finally:
+            shutil.rmtree(tmpdir)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha.hub.egg-info/PKG-INFO 
new/moksha.hub-1.5.17/moksha.hub.egg-info/PKG-INFO
--- old/moksha.hub-1.5.3/moksha.hub.egg-info/PKG-INFO   2017-07-11 
17:08:34.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha.hub.egg-info/PKG-INFO  2019-02-12 
06:20:18.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: moksha.hub
-Version: 1.5.3
+Version: 1.5.17
 Summary: Hub components for Moksha.
 Home-page: https://mokshaproject.net
 Author: Luke Macken, John (J5) Palmieri, Mairin Duffy, and Ralph Bean
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha.hub.egg-info/SOURCES.txt 
new/moksha.hub-1.5.17/moksha.hub.egg-info/SOURCES.txt
--- old/moksha.hub-1.5.3/moksha.hub.egg-info/SOURCES.txt        2017-07-11 
17:08:34.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha.hub.egg-info/SOURCES.txt       2019-02-12 
06:20:18.000000000 +0100
@@ -10,7 +10,6 @@
 moksha.hub.egg-info/dependency_links.txt
 moksha.hub.egg-info/entry_points.txt
 moksha.hub.egg-info/namespace_packages.txt
-moksha.hub.egg-info/pbr.json
 moksha.hub.egg-info/requires.txt
 moksha.hub.egg-info/top_level.txt
 moksha/hub/__init__.py
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/moksha.hub.egg-info/pbr.json 
new/moksha.hub-1.5.17/moksha.hub.egg-info/pbr.json
--- old/moksha.hub-1.5.3/moksha.hub.egg-info/pbr.json   2015-07-27 
20:55:33.000000000 +0200
+++ new/moksha.hub-1.5.17/moksha.hub.egg-info/pbr.json  1970-01-01 
01:00:00.000000000 +0100
@@ -1 +0,0 @@
-{"is_release": false, "git_version": "9928086"}
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/setup.cfg 
new/moksha.hub-1.5.17/setup.cfg
--- old/moksha.hub-1.5.3/setup.cfg      2017-07-11 17:08:34.000000000 +0200
+++ new/moksha.hub-1.5.17/setup.cfg     2019-02-12 06:20:18.000000000 +0100
@@ -1,5 +1,4 @@
 [egg_info]
 tag_build = 
 tag_date = 0
-tag_svn_revision = 0
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/moksha.hub-1.5.3/setup.py 
new/moksha.hub-1.5.17/setup.py
--- old/moksha.hub-1.5.3/setup.py       2017-07-11 17:08:29.000000000 +0200
+++ new/moksha.hub-1.5.17/setup.py      2019-02-12 06:20:04.000000000 +0100
@@ -31,6 +31,10 @@
     "txZMQ",
     "txWS",
     #"python-daemon",
+
+    # Optional
+    #"service_identity",
+    #"pyasn1",
 ]
 
 tests_require = [
@@ -49,7 +53,7 @@
 
 setup(
     name='moksha.hub',
-    version='1.5.3',
+    version='1.5.17',
     description='Hub components for Moksha.',
     author='Luke Macken, John (J5) Palmieri, Mairin Duffy, and Ralph Bean',
     author_email='',


Reply via email to