Repository: airavata
Updated Branches:
  refs/heads/lahiru/AIRAVATA-2065 ce26536fc -> 6786f8ee0 (forced update)


http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/pikaproducer.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/pikaproducer.py 
b/modules/simstream/simstream/pikaproducer.py
new file mode 100755
index 0000000..6ffaf3d
--- /dev/null
+++ b/modules/simstream/simstream/pikaproducer.py
@@ -0,0 +1,202 @@
+"""
+Utilties for sending data.
+
+Author: Jeff Kinnison (jkinn...@nd.edu)
+"""
+
+import json
+import pika
+
+
+class PikaProducer(object):
+    """
+    Utility for sending job data to a set of endpoints.
+    """
+
+    def __init__(self, rabbitmq_url, exchange, exchange_type="direct", 
routing_keys=[]):
+        """
+        Instantiate a new PikaProducer.
+
+        Arguments:
+        rabbitmq_url -- the url of the RabbitMQ server to send to
+        exchange -- the name of the exchange to send to
+
+        Keyword Arguments:
+        exchange_type -- one of one of 'direct', 'topic', 'fanout', 'headers'
+                         (default 'direct')
+        routing_key -- the routing keys to the endpoints for this producer
+                       (default [])
+        """
+        self._url = rabbitmq_url
+        self._exchange = exchange
+        self._exchange_type = exchange_type
+        self._routing_keys = routing_keys
+
+        self._connection = None # RabbitMQ connection object
+        self._channel = None    # RabbitMQ channel object
+
+        import random
+        self._name = random.randint(0,100)
+
+    def __call__(self, data):
+        """
+        Publish data to the RabbitMQ server.
+
+        Arguments:
+        data -- JSON serializable data to send
+        """
+        if self._connection is None: # Start the connection if it is inactive
+            self.start()
+        else: # Serialize and send the data
+            message = self.pack_data(data)
+            self.send_data(message)
+
+    def add_routing_key(self, key):
+        """
+        Add a new endpoint that will receive this data.
+
+        Arguments:
+        key -- the routing key for the new endpoint
+        """
+        if key not in self._routing_keys:
+            #print("Adding key %s to %s" % (key, self._name))
+            self._routing_keys.append(key)
+            #print(self._routing_keys)
+
+    def remove_routing_key(self, key):
+        """
+        Stop sending data to an existing endpoint.
+
+        Arguments:
+        key -- the routing key for the existing endpoint
+        """
+        try:
+            self._routing_keys.remove(key)
+        except ValueError:
+            pass
+
+    def pack_data(self, data):
+        """
+        JSON-serialize the data for transport.
+
+        Arguments:
+        data -- JSON-serializable data
+        """
+        try: # Generate a JSON string from the data
+            msg = json.dumps(data)
+        except TypeError as e: # Generate and return an error if serialization 
fails
+            msg = json.dumps({"err": str(e)})
+        finally:
+            return msg
+
+    def send_data(self, data):
+        """
+        Send the data to all active endpoints.
+
+        Arguments:
+        data -- the message to send
+        """
+        if self._channel is not None: # Make sure the connection is active
+            for key in self._routing_keys: # Send to all endpoints
+                #print(self._exchange, key, self._name)
+                self._channel.basic_publish(exchange = self._exchange,
+                                            routing_key=key,
+                                            body=data)
+
+    def start(self):
+        """
+        Open a connection if one does not exist.
+        """
+        print("Starting new connection")
+        if self._connection is None:
+            print("Creating connection object")
+            self._connection = 
pika.BlockingConnection(pika.URLParameters(self._url))
+            self._channel = self._connection.channel()
+            self._channel.exchange_declare(exchange=self._exchange,
+                                           type=self._exchange_type)
+
+    def shutdown(self):
+        """
+        Close an existing connection.
+        """
+        if self._channel is not None:
+            self._channel.close()
+
+    def _on_connection_open(self, unused_connection):
+        """
+        Create a new channel if the connection opens successful.
+
+        Arguments:
+        unused_connection -- a reference to self._connection
+        """
+        print("Connection is open")
+        self._connection.channel(on_open_callback=self._on_channel_open)
+
+    def _on_connection_close(self, connection, code, text):
+        """
+        Actions to take when the connection is closed for any reason.
+
+        Arguments:
+        connection -- the connection that was closed (same as self._connection)
+        code -- response code from the RabbitMQ server
+        text -- response body from the RabbitMQ server
+        """
+        print("Connection is closed")
+        self._channel = None
+        self._connection = None
+
+    def _on_channel_open(self, channel):
+        """
+        Actions to take when the channel opens.
+
+        Arguments:
+        channel -- the newly opened channel
+        """
+        print("Channel is open")
+        self._channel = channel
+        self._channel.add_on_close_callback(self._on_channel_close)
+        self._declare_exchange()
+
+    def _on_channel_close(self, channel, code, text):
+        """
+        Actions to take when the channel closes for any reason.
+
+        Arguments:
+        channel -- the channel that was closed (same as self._channel)
+        code -- response code from the RabbitMQ server
+        text -- response body from the RabbitMQ server
+        """
+        print("Channel is closed")
+        self._connection.close()
+
+    def _declare_exchange(self):
+        """
+        Set up the exchange to publish to even if it already exists.
+        """
+        print("Exchange is declared")
+        self._channel.exchange_declare(exchange=self._exchange,
+                                       type=self.exchange_type)
+
+if __name__ == "__main__":
+    import time
+
+    config = {
+        "url": "amqp://guest:guest@localhost:5672",
+        "exchange": "simstream",
+        "routing_key": "test_consumer",
+        "exchange_type": "topic"
+    }
+
+    producer = PikaProducer(config["url"],
+                            config["exchange"],
+                            exchange_type=config["exchange_type"],
+                            routing_keys=[config["routing_key"]])
+    producer.start()
+
+    while True:
+        try:
+            time.sleep(5)
+            data = str(time.time()) + ": Hello SimStream"
+            producer.send_data(data)
+        except KeyboardInterrupt:
+            producer.shutdown()

http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/simstream.py
----------------------------------------------------------------------
diff --git a/modules/simstream/simstream/simstream.py 
b/modules/simstream/simstream/simstream.py
new file mode 100755
index 0000000..499a8c3
--- /dev/null
+++ b/modules/simstream/simstream/simstream.py
@@ -0,0 +1,167 @@
+import pika
+
+from .pikaasyncconsumer import PikaAsyncConsumer
+from .datacollector import DataCollector
+from .datareporter import DataReporter
+from .eventhandler import EventHandler
+from .eventmonitor import EventMonitor
+
+
+class ReporterExistsException(Exception):
+    """Thrown when attempting to add a DataReporter with a conflicting name"""
+    pass
+
+
+class SimStream(object):
+    """
+    Manager for routing messages to their correct reporter.
+    """
+
+    DEFAULT_CONFIG_PATH="simstream.cnf"
+
+
+    class MessageParser(object):
+        """
+        Internal message parsing facilities.
+        """
+
+        def __init__(self):
+            self.parsed = None
+
+        def __call__(self, message):
+            pass
+
+
+    def __init__(self, reporters={}, config={}):
+        self.reporters = reporters
+        self.consumer = None
+        self.config = config
+
+    def add_data_reporter(self, reporter):
+        """
+        Add a new DataReporter object.
+
+        Arguments:
+        reporter -- the DataReporter to add
+        """
+        if reporter.name in self.reporters:
+            raise ReporterExistsException
+        self.reporters[reporter.name] = reporter
+
+    def parse_config(self):
+        """
+        Read the config file and set up the specified, data collection and
+        event handling resources.
+        """
+        # TODO: Read in config
+        # TODO: Set up configuration dict
+        pass
+
+    def route_message(self, message):
+        """
+        Send a message to the correct reporter.
+        """
+        # TODO: Create new MessageParser
+        # TODO: Run message through MessageParser
+        # TODO: Route message to the correct DataReporter/EventMonitor
+        parser = MessageParser()
+        parser(message)
+        if parser.reporter_name in self.reporters:
+            self.reporters[parser.reporter_name].start_streaming(
+                    parser.collector_name,
+                    parser.routing_key
+                )
+
+    def start_collecting(self):
+        """
+        Begin collecting data and monitoring for events.
+        """
+        for reporter in self.reporters:
+            self.reporters[reporter].start_collecting()
+
+    def setup(self):
+        """
+        Set up the SimStream instance: create DataCollectors, create
+        EventMonitors, configure AMQP consumer.
+        """
+        self.parse_config()
+        #self.setup_consumer()
+        self.setup_data_collection()
+        self.setup_event_monitoring()
+
+    def setup_data_collection(self):
+        """
+        Set up all DataReporters and DataCollectors.
+        """
+        # TODO: Create and configure all DataReporters
+        # TODO: Create and configure all DataCollectors
+        # TODO: Assign each DataCollector to the correct DataReporter
+        if "reporters" in self.config:
+            for reporter in self.config.reporters:
+                pass
+            for collector in self.config.collectors:
+                pass
+
+    def setup_event_monitoring(self):
+        #TODO: Create and configure all EventMonitors
+        #TODO: Create and configure all EventHandlers
+        #TODO: Assign each EventHandler to the correct EventMonitor
+        #TODO: Assign each EventMonitor to the correct DataCollector
+        pass
+
+    def setup_consumer(self):
+        """
+        Set up and configure the consumer.
+        """
+        if len(self.config) > 0 and self.consumer is None:
+            if "message_handler" in self.config:
+                message_handler = self.config["message_handler"]
+            else:
+                message_handler = self.route_message
+            self.consumer = PikaAsyncConsumer(self.config["url"],
+                                              self.config["exchange"],
+                                              self.config["queue"],
+                                              message_handler,
+                                              
exchange_type=self.config["exchange_type"],
+                                              
routing_key=self.config["routing_key"]
+                                             )
+
+    def start(self):
+        """
+        Configure and start SimStream.
+        """
+        if self.consumer is None:
+            self.setup()
+        self.start_collecting()
+        #self.consumer.start()
+
+    def stop(self):
+        """
+        Stop all data collection, event monitoring, and message consumption.
+        """
+        self.consumer.stop()
+        self.stop_collecting()
+
+
+if __name__ == "__main__":
+    def print_message(message):
+        with open("test.out", "w") as f:
+            print(message)
+
+    print(SimStream.DEFAULT_CONFIG_PATH)
+
+    config = {
+        "url": "amqp://guest:guest@localhost:5672",
+        "exchange": "simstream",
+        "queue": "simstream_test",
+        "message_handler": print_message,
+        "routing_key": "test_consumer",
+        "exchange_type": "topic"
+    }
+
+    streamer = SimStream(config=config)
+
+    try:
+        streamer.start()
+    except KeyboardInterrupt:
+        streamer.stop()

Reply via email to