add simstream module to airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/24ced800 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/24ced800 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/24ced800 Branch: refs/heads/lahiru/AIRAVATA-2065 Commit: 24ced80054f084a78d86fff36672c834e144d3e9 Parents: 2c7da5e Author: Jeff Kinnison <jeffdkinni...@gmail.com> Authored: Fri Jun 10 16:46:02 2016 -0400 Committer: Jeff Kinnison <jeffdkinni...@gmail.com> Committed: Fri Jun 10 16:46:02 2016 -0400 ---------------------------------------------------------------------- modules/simstream/README.md | 18 + modules/simstream/example/README.md | 9 + .../simstream/example/logfile_checker/README.md | 23 + .../example/logfile_checker/generate_logs.sh | 22 + .../example/logfile_checker/log_consumer.py | 43 ++ .../example/logfile_checker/log_streamer.py | 111 ++++ .../example/logfile_checker/remote_log.slurm | 21 + .../simstream/example/logfile_checker/test.txt | 657 +++++++++++++++++++ .../simstream/example/mem_streamer/README.md | 17 + .../example/mem_streamer/memory_consumption.py | 83 +++ .../example/mem_streamer/memory_streamer.py | 46 ++ .../simstream/example/openmm_example/README.md | 33 + .../application/alanine_dipeptide.py | 55 ++ .../openmm_example/application/input.pdb | 24 + .../openmm_example/application/trajectory.dcd | 0 .../example/openmm_example/openmm_consumer.py | 8 + .../openmm_example/openmm_log_consumer.py | 32 + .../openmm_example/openmm_rmsd_consumer.py | 36 + .../example/openmm_example/openmm_stream.slurm | 19 + .../example/openmm_example/openmm_streamer.py | 130 ++++ .../simstream/example/openmm_example/test.txt | 0 modules/simstream/example/settings.json | 6 + modules/simstream/setup.py | 19 + modules/simstream/simstream/__init__.py | 11 + modules/simstream/simstream/datacollector.py | 110 ++++ modules/simstream/simstream/datareporter.py | 169 +++++ modules/simstream/simstream/eventhandler.py | 17 + modules/simstream/simstream/eventmonitor.py | 46 ++ .../simstream/simstream/pikaasyncconsumer.py | 203 ++++++ modules/simstream/simstream/pikaproducer.py | 202 ++++++ modules/simstream/simstream/simstream.py | 167 +++++ 31 files changed, 2337 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/README.md ---------------------------------------------------------------------- diff --git a/modules/simstream/README.md b/modules/simstream/README.md new file mode 100755 index 0000000..9ab1379 --- /dev/null +++ b/modules/simstream/README.md @@ -0,0 +1,18 @@ +# simstream +A utility for user-defined remote system and simulation data monitoring. + +## Dependencies +* pika >= 0.10.0 (`pip install pika`) +* A running, accessible instance of RabbitMQ server + +## Installation +1. Clone this repository +2. `python setup.py install` + +## Running the Example +The example runs a simple collector that records the maximum memory used by the server (MB) and a timestamp. It also generates a plot of the results. + +1. Edit `example/memory_consumption.py` and `example/memory_streamer.py` with the correct RabbitMQ settings +2. From the repository root, run `python example/memory_consumption.py` +3. Open a new terminal session and run `python example/memory_streamer.py` +4. Memory usage information should now be collected in the current terminal and received in the original terminal http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/README.md ---------------------------------------------------------------------- diff --git a/modules/simstream/example/README.md b/modules/simstream/example/README.md new file mode 100755 index 0000000..23f36d5 --- /dev/null +++ b/modules/simstream/example/README.md @@ -0,0 +1,9 @@ +# SimStream Examples + +This directory contains several examples showcasing the functionality of SimStream. To run them, download and install Python 2.7/3.5, install SimStream using setup.py, and modify the settings.json file to match your RabbitMQ server settings. + +## The Examples + +* mem_streamer: Stream max RSS memory consumed by a basic SimStream utility +* logfile_checker: Collect, filter, and stream tagged log file entries +* openmm_example: Run a molecular dynamics simulation and return log information and system state measured by root mean squared deviation \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/README.md ---------------------------------------------------------------------- diff --git a/modules/simstream/example/logfile_checker/README.md b/modules/simstream/example/logfile_checker/README.md new file mode 100755 index 0000000..30ed071 --- /dev/null +++ b/modules/simstream/example/logfile_checker/README.md @@ -0,0 +1,23 @@ +# SimStream Example: Logfile Streaming + +This example filters log file entries by starting tag and sends them to a remote listener. The listener prints the logs it receives to terminal. + +## Instructions + +### Start the Publisher +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `python log_streamer.py` + +### Start the Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `python log_consumer.py` + +### Write Some Logs +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `chmod 700 generate_logs.sh` +4. `./generate_logs.sh` + +This will write logs to `test.txt`. The Publisher will continuously check for new logs, filter based on the [STATUS] and [ERROR] tags, and send the filtered results to the RabbitMQ server. The Consumer will receive the filtered log entries and print them to the terminal. http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/generate_logs.sh ---------------------------------------------------------------------- diff --git a/modules/simstream/example/logfile_checker/generate_logs.sh b/modules/simstream/example/logfile_checker/generate_logs.sh new file mode 100755 index 0000000..5fb7aa0 --- /dev/null +++ b/modules/simstream/example/logfile_checker/generate_logs.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +outfile="test.txt" + +echo "[STATUS] Starting logfile generator" >> $outfile + +sleep 2 + +echo "[STATUS] Doing stuff" >> $outfile +echo "Stuff that doesn't need to be reported" >> $outfile +echo "Stuff that also doesn't need to be reported" >> $outfile +echo "[DATA] 7.267" >> $outfile + +sleep 2 + +echo "[STATUS] Doing more stuff" >> $outfile +echo "Yet more stuff that doesn't need to be reported" >> $outfile +echo "[ERROR] Some non-fatal error that the user should know about" >> $outfile + +sleep 2 + +echo "[STATUS] Finished generating logs" >> $outfile \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/log_consumer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/logfile_checker/log_consumer.py b/modules/simstream/example/logfile_checker/log_consumer.py new file mode 100755 index 0000000..bf3beac --- /dev/null +++ b/modules/simstream/example/logfile_checker/log_consumer.py @@ -0,0 +1,43 @@ +import json +from simstream import PikaAsyncConsumer + +#settings = { +# "url": "amqp://guest:guest@localhost:5672", +# "exchange": "simstream", +# "queue": "test", +# "routing_key": "logfile", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + +def print_log_line(body): + try: + lines = json.loads(body.decode()) + if lines is not None: + for line in lines: + print(line) + except json.decoder.JSONDecodeError as e: + print("[Error]: Could not decode %s" % (body)) + except UnicodeError as e: + print("[Error]: Could not decode from bytes to string: %s" % (e.reason)) + + +consumer = PikaAsyncConsumer( + settings["url"], + settings["exchange"], + settings["queue"], + print_log_line, + exchange_type=settings["exchange_type"], + routing_key=settings["routing_key"] + ) + +if __name__ == "__main__": + try: + consumer.start() + except KeyboardInterrupt: + consumer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/log_streamer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/logfile_checker/log_streamer.py b/modules/simstream/example/logfile_checker/log_streamer.py new file mode 100755 index 0000000..65f84f0 --- /dev/null +++ b/modules/simstream/example/logfile_checker/log_streamer.py @@ -0,0 +1,111 @@ +from simstream import SimStream, DataReporter + +import sys, json + +class LogMonitor(object): + """ + A callable class that returns unprocessed lines in an open logfile. + + Instance Variables: + logfile -- the path to the logfile to monitor + """ + + def __init__(self, logfile): + """ + Set up a monitor for a logfile. + + Arguments: + logfile -- the path to the logfile to monitor + """ + self.logfile = logfile + self._generator = None + self._version = sys.version_info[0] + + def __call__(self): + """ + Get the next line from the logfile. + """ + if not self._generator: + self._generator = self._monitor_logfile() + + lines = [] + + line = self._next() + while line is not None: + lines.append(line) + line = self._next() + + return lines + + def _monitor_logfile(self): + """ + Yield the next set of lines from the logfile. + """ + try: + # Make the file persistent for the lifetime of the generator + with open(self.logfile) as f: + f.seek(0,2) # Move to the end of the file + while True: + # Get the next line or indicate the end of the file + line = f.readline() + if line: + yield line.strip() + else: + yield None + + except EnvironmentError as e: + # Handle I/O exceptions in an OS-agnostic way + print("Error: Could not open file %s: %s" % (self.logfile, e)) + + def _next(self): + """ + Python 2/3 agnostic retrieval of generator values. + """ + return self._generator.__next__() if self._version == 3 else self._generator.next() + + +def get_relevant_log_lines(log_lines): + import re + relevant_lines = [] + pattern = r'^\[(STATUS|ERROR)\]' + for line in log_lines: + if re.match(pattern, line) is not None: + relevant_lines.append(line) + return relevant_lines + + +#settings = { +# "url": "amqp://guest:guest@localhost:5672", +# "exchange": "simstream", +# "queue": "test", +# "routing_key": "logfile", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + +if __name__ == "__main__": + logfile = sys.argv[1] + log_reporter = DataReporter() + log_reporter.add_collector("logger", + LogMonitor(logfile), + settings["url"], + settings["exchange"], + limit=10, + interval=2, + exchange_type=settings["exchange_type"], + postprocessor=get_relevant_log_lines) + + log_reporter.start_streaming("logger", settings["routing_key"]) + + streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter}) + streamer.setup() + + try: + streamer.start() + except KeyboardInterrupt: + streamer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/remote_log.slurm ---------------------------------------------------------------------- diff --git a/modules/simstream/example/logfile_checker/remote_log.slurm b/modules/simstream/example/logfile_checker/remote_log.slurm new file mode 100644 index 0000000..55834e9 --- /dev/null +++ b/modules/simstream/example/logfile_checker/remote_log.slurm @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +#SBATCH -J remote_logger # Job name +#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId) +#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId) +#SBATCH -p development # large queue for jobs > 256 nodes +#SBATCH -t 00:10:00 # Run time (hh:mm:ss) - 1.5 hours +#SBATCH -n 1 # Nodes to use + +module use "/home1/03947/tg832463/modulefiles" +module load openmm + +touch test.txt + +python log_streamer.py test.txt & + +while true; do + bash generate_logs.sh + sleep 5 +done + http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/logfile_checker/test.txt ---------------------------------------------------------------------- diff --git a/modules/simstream/example/logfile_checker/test.txt b/modules/simstream/example/logfile_checker/test.txt new file mode 100755 index 0000000..2ffb48c --- /dev/null +++ b/modules/simstream/example/logfile_checker/test.txt @@ -0,0 +1,657 @@ +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/mem_streamer/README.md ---------------------------------------------------------------------- diff --git a/modules/simstream/example/mem_streamer/README.md b/modules/simstream/example/mem_streamer/README.md new file mode 100755 index 0000000..897b77a --- /dev/null +++ b/modules/simstream/example/mem_streamer/README.md @@ -0,0 +1,17 @@ +# SimStream Example: Memory Usage Streamer + +This example collects data on the memory used by the Publisher and sends that data to the Consumer. + +## Instructions + +### Start the Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `python log_consumer.py` + +### Starting the Consumer +1. Open a new terminal +2. `cd path/to/simstream/examples/mem_streamer` +3. `python memory_consumer.py + +The Consumer should receive the memory used by the Publisher (KB) and the time that the data was collected (s since UNIX epoch) at a 2-second interval. http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/mem_streamer/memory_consumption.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/mem_streamer/memory_consumption.py b/modules/simstream/example/mem_streamer/memory_consumption.py new file mode 100755 index 0000000..b67e975 --- /dev/null +++ b/modules/simstream/example/mem_streamer/memory_consumption.py @@ -0,0 +1,83 @@ +import tornado.ioloop +import tornado.web +import tornado.websocket + +import json + +from simstream import PikaAsyncConsumer, PikaProducer + +#settings = { +# "url": "amqp://localhost:5672", +# "exchange": "simstream", +# "queue": "remote_node", +# "routing_key": "test", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + + +def print_result(body): + try: + data = json.loads(body.decode()) + print("%s: %s" % (data["x"], data["y"])) + except json.decoder.JSONDecodeError as e: + print("[ERROR] Could not decode JSON %s: %s", (body, e)) + except UnicodeError as e: + print("[ERROR] Could not decode message %s: %s" % (body, e.reason)) + +consumer = PikaAsyncConsumer(settings['url'], + settings['exchange'], + settings['queue'], + print_result, + exchange_type=settings['exchange_type'], + routing_key=settings['routing_key']) + +consumer.start() + +# class PlotHandler(tornado.web.RequestHandler): + +# def get(self): +# pass + + +# class StreamingHandler(tornado.websocket.WebSocketHandler): + +# def open(self): +# self.consumer = PikaAsyncConsumer(settings.url, +# settings.exchange, +# settings.queue, +# self.send_data, +# routing_keys=settings.routing_key, +# exchange_type=settings.exchange_type +# ) +# self.producer = PikaProducer("", +# remote_settings.url, +# remote_settings.exchange, +# remote_settings.queue, +# remote_settings.routing_key) + +# def on_message(self, message): +# if hasattr(self, producer) and producer is not None: +# self.producer.send_data(message) + +# def on_close(self): +# self.consumer.stop() +# self.producer.shutdown() +# self.consumer = None +# self.producer = None + +# def send_data(self, ch, method, properties, body): +# self.write_message(body) + +# if __name__ == "__main__": +# app = tornado.web.Application([ +# (r"/plot/(.*)", ) +# (r"/stream/(.*)", StreamingHandler) +# ]) +# app.listen(8888) +# tornado.ioloop.IOLoop.current().start() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/mem_streamer/memory_streamer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/mem_streamer/memory_streamer.py b/modules/simstream/example/mem_streamer/memory_streamer.py new file mode 100755 index 0000000..88f0d9a --- /dev/null +++ b/modules/simstream/example/mem_streamer/memory_streamer.py @@ -0,0 +1,46 @@ +import resource +import time +import json + +from simstream import SimStream, DataReporter, DataCollector + +#settings = { +# "url": "amqp://localhost:5672", +# "exchange": "simstream", +# "queue": "remote_node", +# "routing_key": "stream_sender", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + +def mem_callback(): + return {'x': time.time() * 1000, + 'y': resource.getrusage(resource.RUSAGE_SELF).ru_maxrss} + + +def mem_postprocessor(rss): + rss.y = rss.y / 1000000 + return rss + +mem_reporter = DataReporter() +mem_reporter.add_collector("rss", + mem_callback, + settings["url"], + settings["exchange"], + limit=100, + interval=2, + postprocessor=mem_postprocessor, + ) + +mem_reporter.start_streaming("rss", "test") + +if __name__ == "__main__": + resource_streamer = SimStream(reporters={"memory": mem_reporter}, + config=settings) + resource_streamer.setup() + resource_streamer.start() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/README.md ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/README.md b/modules/simstream/example/openmm_example/README.md new file mode 100644 index 0000000..59a0588 --- /dev/null +++ b/modules/simstream/example/openmm_example/README.md @@ -0,0 +1,33 @@ +# SimStream Example: Simulating Alanine Dipeptide + +This example runs a simulation of the small molecule Alanine Dipeptide and streams logs and RMSD. RMSD is a metric for judging how similar two molecular states are for the same model. + +## Instructions + +### Installing OpenMM +The easiest way to install OpenMM is to use the Anaconda distribution of Python and run +`conda install -c https://conda.anaconda.org/omnia openmm` + +If you do not wish to use Anaconda, install OpenMM from source by following the instructions in the [OpenMM docs](http://docs.openmm.org/7.0.0/userguide/application.html#installing-openmm "OpenMM documentation") + +### Start the Logfile Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/openmm_example` +3. `python openmm_log_consumer.py` + +### Start the RMSD Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/openmm_example` +3. `python openmm_rmsd_consumer.py` + +### Starting the Producer +1. Open a new terminal +2. `cd path/to/simstream/examples/openmm_example` +3. `python openmm_streamer.py application/sim.out application/trajectory.dcd application/input.pdb application/input.pdb` + +### Starting the Simulation +1. Open a new terminal +2. `cd path/to/simstream/examples/openmm_example/application` +3. `python alanine_dipeptide.py > sim.out` + +The Logfile Consumer should now be printing tagged log entries to the screen; the RMSD Consumer should be printing the calculated RMSD each time the trajectory file is written. http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/application/alanine_dipeptide.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/application/alanine_dipeptide.py b/modules/simstream/example/openmm_example/application/alanine_dipeptide.py new file mode 100644 index 0000000..8b22b16 --- /dev/null +++ b/modules/simstream/example/openmm_example/application/alanine_dipeptide.py @@ -0,0 +1,55 @@ +########################################################################## +# this script was generated by openmm-builder. to customize it further, +# you can save the file to disk and edit it with your favorite editor. +########################################################################## + +from __future__ import print_function +from simtk.openmm import app +import simtk.openmm as mm +from simtk import unit +from sys import stdout + +print("[START] Application is now running") + +pdb = app.PDBFile('input.pdb') +print("[STATUS] Loaded model") +forcefield = app.ForceField('amber03.xml', 'amber03_obc.xml') +print("[STATUS] Loaded force field") + +system = forcefield.createSystem(pdb.topology, nonbondedMethod=app.NoCutoff, + constraints=None, rigidWater=False) +print("[STATUS] Created system") +integrator = mm.LangevinIntegrator(300*unit.kelvin, 91/unit.picoseconds, + 1.0*unit.femtoseconds) +print("[STATUS] Created integrator") + +try: + platform = mm.Platform.getPlatformByName('CPU') +except Exception as e: + print("[ERROR] Could not load platform CPU. Running Reference") + platform = mm.Platform.getPlatformByName("Reference") + +simulation = app.Simulation(pdb.topology, system, integrator, platform) +print("[STATUS] Set up compute platform") +simulation.context.setPositions(pdb.positions) +print("[STATUS] Set atomic positions") + +print('[STATUS] Minimizing...') +simulation.minimizeEnergy() +print('[STATUS] Equilibrating...') +simulation.step(100) + +simulation.reporters.append(app.DCDReporter('trajectory.dcd', 1000)) +simulation.reporters.append(app.StateDataReporter(stdout, 1000, step=True, + potentialEnergy=True, totalEnergy=True, temperature=True, separator='\t')) +print("[STATUS] Set up reporters") + +print('[STATUS] Running Production...') + +increment = 1000 + +for i in range(0,100000,increment): + print("[STATUS] Step %s" % (i)) + simulation.step(increment) + +print('[END] Done!') http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/application/input.pdb ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/application/input.pdb b/modules/simstream/example/openmm_example/application/input.pdb new file mode 100644 index 0000000..a47f196 --- /dev/null +++ b/modules/simstream/example/openmm_example/application/input.pdb @@ -0,0 +1,24 @@ +ATOM 1 1HH3 ACE 1 4.300 13.100 8.600 1.00 0.00 +ATOM 2 CH3 ACE 1 5.200 13.600 8.800 1.00 0.00 +ATOM 3 2HH3 ACE 1 4.900 14.300 9.600 1.00 0.00 +ATOM 4 3HH3 ACE 1 5.600 14.200 7.900 1.00 0.00 +ATOM 5 C ACE 1 6.100 12.500 9.400 1.00 0.00 +ATOM 6 O ACE 1 6.400 12.500 10.600 1.00 0.00 +ATOM 7 N ALA 2 6.600 11.600 8.500 1.00 0.00 +ATOM 8 H ALA 2 6.500 11.600 7.500 1.00 0.00 +ATOM 9 CA ALA 2 7.300 10.400 9.100 1.00 0.00 +ATOM 10 HA ALA 2 7.900 10.700 10.000 1.00 0.00 +ATOM 11 CB ALA 2 6.200 9.500 9.600 1.00 0.00 +ATOM 12 HB1 ALA 2 5.700 9.100 8.800 1.00 0.00 +ATOM 13 HB2 ALA 2 6.600 8.700 10.200 1.00 0.00 +ATOM 14 HB3 ALA 2 5.400 10.000 10.200 1.00 0.00 +ATOM 15 C ALA 2 8.400 9.800 8.200 1.00 0.00 +ATOM 16 O ALA 2 8.400 9.900 7.000 1.00 0.00 +ATOM 17 N NME 3 9.300 9.000 8.800 1.00 0.00 +ATOM 18 H NME 3 9.100 9.000 9.800 1.00 0.00 +ATOM 19 CH3 NME 3 10.500 8.400 8.300 1.00 0.00 +ATOM 20 1HH3 NME 3 10.700 7.700 9.100 1.00 0.00 +ATOM 21 2HH3 NME 3 10.400 8.000 7.300 1.00 0.00 +ATOM 22 3HH3 NME 3 11.300 9.100 8.300 1.00 0.00 +TER +ENDMDL http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/application/trajectory.dcd ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/application/trajectory.dcd b/modules/simstream/example/openmm_example/application/trajectory.dcd new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_consumer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/openmm_consumer.py b/modules/simstream/example/openmm_example/openmm_consumer.py new file mode 100644 index 0000000..4ba2763 --- /dev/null +++ b/modules/simstream/example/openmm_example/openmm_consumer.py @@ -0,0 +1,8 @@ +import json +from simstream import PikaAsyncConsumer + +def recv_log(body): + try: + logs = json.loads(body.decode()) + for log in logs: + print(log) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_log_consumer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/openmm_log_consumer.py b/modules/simstream/example/openmm_example/openmm_log_consumer.py new file mode 100644 index 0000000..e28043f --- /dev/null +++ b/modules/simstream/example/openmm_example/openmm_log_consumer.py @@ -0,0 +1,32 @@ +import json +from simstream import PikaAsyncConsumer + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) +settings["routing_key"] = "openmm.log" + +def print_log_line(body): + try: + lines = json.loads(body.decode()) + if lines is not None: + for line in lines: + print(line) + except json.decoder.JSONDecodeError as e: + print("[Error]: Could not decode %s" % (body)) + except UnicodeError as e: + print("[Error]: Could not decode from bytes to string: %s" % (e.reason)) + +consumer = PikaAsyncConsumer(settings["url"], + settings["exchange"], + "openmm.log", # settings["queue"], + message_handler=print_log_line, + routing_key=settings["routing_key"], + exchange_type=settings["exchange_type"]) + +if __name__ == "__main__": + try: + consumer.start() + except KeyboardInterrupt: + consumer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py b/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py new file mode 100644 index 0000000..f5d87c6 --- /dev/null +++ b/modules/simstream/example/openmm_example/openmm_rmsd_consumer.py @@ -0,0 +1,36 @@ +import json +from simstream import PikaAsyncConsumer + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) +settings["routing_key"] = "openmm.rmsd" + +def print_rmsd(body): + try: + lines = json.loads(body.decode()) + if lines is not None: + for line in lines: + print(line[0]) + except json.decoder.JSONDecodeError as e: + print("[Error]: Could not decode %s" % (body)) + except UnicodeError as e: + print("[Error]: Could not decode from bytes to string: %s" % (e.reason)) + except IndexError as e: + print("[Error]: List is empty") + except KeyError: + print(lines) + +consumer = PikaAsyncConsumer(settings["url"], + settings["exchange"], + "openmm.rmsd", # settings["queue"], + message_handler=print_rmsd, + routing_key=settings["routing_key"], + exchange_type=settings["exchange_type"]) + +if __name__ == "__main__": + try: + consumer.start() + except KeyboardInterrupt: + consumer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_stream.slurm ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/openmm_stream.slurm b/modules/simstream/example/openmm_example/openmm_stream.slurm new file mode 100644 index 0000000..837e4d4 --- /dev/null +++ b/modules/simstream/example/openmm_example/openmm_stream.slurm @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +#SBATCH -J remote_logger # Job name +#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId) +#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId) +#SBATCH -p development # large queue for jobs > 256 nodes +#SBATCH -t 00:10:00 # Run time (hh:mm:ss) - 1.5 hours +#SBATCH -n 1 # Nodes to use + +#module use "/home1/03947/tg832463/modulefiles" +#module load openmm + +touch test.txt + +python openmm_streamer.py ./application/sim.out ./application/trajectory.dcd ./application/input.pdb ./application/input.pdb & + +cd application +python alanine_dipeptide.py > sim.out +sleep 5 http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/openmm_streamer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/openmm_streamer.py b/modules/simstream/example/openmm_example/openmm_streamer.py new file mode 100644 index 0000000..da95614 --- /dev/null +++ b/modules/simstream/example/openmm_example/openmm_streamer.py @@ -0,0 +1,130 @@ +from simstream import SimStream, DataReporter + +import sys, json + +class LogMonitor(object): + """ + A callable class that returns unprocessed lines in an open logfile. + + Instance Variables: + logfile -- the path to the logfile to monitor + """ + + def __init__(self, logfile): + """ + Set up a monitor for a logfile. + + Arguments: + logfile -- the path to the logfile to monitor + """ + self.logfile = logfile + self._generator = None + self._version = sys.version_info[0] + + def __call__(self): + """ + Get the next line from the logfile. + """ + if not self._generator: + self._generator = self._monitor_logfile() + + lines = [] + + line = self._next() + while line is not None: + lines.append(line) + line = self._next() + print(lines) + return lines + + def _monitor_logfile(self): + """ + Yield the next set of lines from the logfile. + """ + try: + # Make the file persistent for the lifetime of the generator + with open(self.logfile) as f: + f.seek(0,2) # Move to the end of the file + while True: + # Get the next line or indicate the end of the file + line = f.readline() + if line: + yield line.strip() + else: + yield None + + except EnvironmentError as e: + # Handle I/O exceptions in an OS-agnostic way + print("Error: Could not open file %s: %s" % (self.logfile, e)) + + def _next(self): + """ + Python 2/3 agnostic retrieval of generator values. + """ + return self._generator.__next__() if self._version == 3 else self._generator.next() + + +def get_relevant_log_lines(log_lines): + import re + relevant_lines = [] + pattern = r'^\[.+\]' + for line in log_lines: + if re.match(pattern, line) is not None: + relevant_lines.append(line) + return relevant_lines + + +def calculate_rmsd(trajectory, topology, reference): + import mdtraj + traj = mdtraj.load(trajectory, top=topology) + ref = mdtraj.load(reference) + rmsd = mdtraj.rmsd(traj, ref) + data = {"step": str(traj.n_frames), "rmsd": str(rmsd[-1])} + return data + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + + +if __name__ == "__main__": + logfile = sys.argv[1] + trajectory = sys.argv[2] + topology = sys.argv[3] + reference = sys.argv[4] + + open(logfile, 'a').close() + open(trajectory, 'a').close() + + log_reporter = DataReporter() + log_reporter.add_collector("logger", + LogMonitor(logfile), + settings["url"], + settings["exchange"], + limit=10, + interval=2, + exchange_type="direct", # settings["exchange_type"], + postprocessor=get_relevant_log_lines) + + log_reporter.start_streaming("logger", "openmm.log") + + rmsd_reporter = DataReporter() + rmsd_reporter.add_collector("rmsd", + calculate_rmsd, + settings["url"], + settings["exchange"], + limit=1, + interval=2, + exchange_type="direct", # settings["exchange_type"], + callback_args=[trajectory, topology, reference]) + + rmsd_reporter.start_streaming("rmsd", "openmm.rmsd") + + streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter, "rmsd_reporter": rmsd_reporter}) + streamer.setup() + + try: + streamer.start() + except KeyboardInterrupt: + streamer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/openmm_example/test.txt ---------------------------------------------------------------------- diff --git a/modules/simstream/example/openmm_example/test.txt b/modules/simstream/example/openmm_example/test.txt new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/example/settings.json ---------------------------------------------------------------------- diff --git a/modules/simstream/example/settings.json b/modules/simstream/example/settings.json new file mode 100644 index 0000000..d354d46 --- /dev/null +++ b/modules/simstream/example/settings.json @@ -0,0 +1,6 @@ +{ + "url": "amqp://guest:guest@localhost:5672", + "exchange": "simstream", + "queue": "test", + "exchange_type": "topic" +} http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/setup.py ---------------------------------------------------------------------- diff --git a/modules/simstream/setup.py b/modules/simstream/setup.py new file mode 100755 index 0000000..2f3b3fd --- /dev/null +++ b/modules/simstream/setup.py @@ -0,0 +1,19 @@ +""" + Setup for simstream module. + + Author: Jeff Kinnison (jkinn...@nd.edu) +""" + +from setuptools import setup, find_packages + +setup( + name="simstream", + version="0.1dev", + author="Jeff Kinnison", + author_email="jkinn...@nd.edu", + packages=find_packages(), + description="", + install_requires=[ + "pika >= 0.10.0" + ], +) http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/__init__.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/__init__.py b/modules/simstream/simstream/__init__.py new file mode 100755 index 0000000..9d403cb --- /dev/null +++ b/modules/simstream/simstream/__init__.py @@ -0,0 +1,11 @@ +""" +Utilties for collecting and distributing system data. + +Author: Jeff Kinnison (jkinn...@nd.edu) +""" + +from .simstream import SimStream +from .datareporter import DataReporter, CollectorExistsException, CollectorDoesNotExistException +from .datacollector import DataCollector +from .pikaasyncconsumer import PikaAsyncConsumer +from .pikaproducer import PikaProducer http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/datacollector.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/datacollector.py b/modules/simstream/simstream/datacollector.py new file mode 100755 index 0000000..f7f99c1 --- /dev/null +++ b/modules/simstream/simstream/datacollector.py @@ -0,0 +1,110 @@ +""" +Utilties for collecting system data. + +Author: Jeff Kinnison (jkinn...@nd.edu) +""" + +from .pikaproducer import PikaProducer + +from threading import Thread, Lock, Event + +import copy + +# TODO: Refactor into subclass of Thread + +class DataCollector(Thread): + """Collects data by running user-specified routines. + + Inherits from: threading.Thread + + Instance variables: + name -- the name of the collector + limit -- the maximum number of maintained data points + interval -- the interval (in seconds) at which data collection is performed + + Public methods: + activate -- start collecting data + add_routing_key -- add a new streaming endpoint + deactivate -- stop further data collection + remove_routing_key -- remove a streaming endpoint + run -- collect data if active + """ + def __init__(self, name, callback, rabbitmq_url, exchange, exchange_type="direct", limit=250, interval=10, + postprocessor=None, callback_args=[], postprocessor_args=[]): + """ + Arguments: + name -- the name of the collector + callback -- the data collection function to run + + Keyword arguments: + limit -- the maximum number of maintained data points (default 250) + interval -- the time interval in seconds at which to collect data + (default: 10) + postprocessor -- a function to run on the return value of callback + (default None) + callback_args -- the list of arguments to pass to the callback + (default []) + postprocessor_args -- the list of arguments to pass to the + postprocessor (default []) + """ + super(DataCollector, self).__init__() + self.name = name if name else "Unknown Resource" + self.limit = limit + self.interval = interval + self._callback = callback + self._callback_args = callback_args + self._postprocessor = postprocessor + self._postprocessor_args = postprocessor_args + self._data = [] + self._data_lock = Lock() + self._active = False + self._producer = PikaProducer(rabbitmq_url, exchange, exchange_type=exchange_type, routing_keys=[]) + + def activate(self): + """ + Start collecting data. + """ + self._active = True + + def add_routing_key(self, key): + """ + Add a new producer endpoint. + """ + self._producer.add_routing_key(key) + + + def deactivate(self): + """ + Stop collecting data. + """ + self._active = False + + def remove_routing_key(self, key): + self._producer.remove_routing_key(key) + if len(self._producer.endpoints) == 0: + self._producer.shutdown() + + def run(self): + """ + Run the callback and postprocessing subroutines and record result. + + Catches generic exceptions because the function being run is not + known beforehand. + """ + self._collection_event = Event() + while self._active and not self._collection_event.wait(timeout=self.interval): + try: + result = self._callback(*self._callback_args) + result = self._postprocessor(result, *self._postprocessor_args) if self._postprocessor else result + #print("Found the value ", result, " in ", self.name) + self._data.append(result) + if len(self._data) > self.limit: + self._data.pop(0) + self._producer(copy.copy(self._data)) + + except Exception as e: + print("[ERROR] %s" % (e)) + + def stop(self): + for key in self.producer.routing_keys: + self.remove_routing_key(key) http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/datareporter.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/datareporter.py b/modules/simstream/simstream/datareporter.py new file mode 100755 index 0000000..156cc08 --- /dev/null +++ b/modules/simstream/simstream/datareporter.py @@ -0,0 +1,169 @@ +""" +Utilties for collecting system data. + +Author: Jeff Kinnison (jkinn...@nd.edu) +""" + +# TODO: Refactor to iterate over producers, not collectors. Collectors should +# execute concurrently. +# TODO: Add method to deactivate reporter + +from threading import Thread, Event + +from .datacollector import DataCollector + + +class CollectorExistsException(Exception): + """Thrown when attempting to add a collector with a conflicting name.""" + pass + + +class CollectorDoesNotExistException(Exception): + """Thrown when attempting to access a collector that does not exist.""" + pass + + +class DataReporter(object): + """Manages collecting specified data. + + Subclass of threading.Thread that modifies Thread.join() and Thread.run() + + Instance variables: + collectors -- a dict of DataCollectors that are run at interval + + Public methods: + add_collector -- add a new DataCollector to the list + run -- start the data collection loop + join -- end data collection and return control to main thread + start_collecting -- begin data collection for all collectors + start_collector -- begin data collection for a specific collector + stop_collecting -- stop all data collection + stop_collector -- stop a running DataCollector + """ + + def __init__(self, collectors={}): + super(DataReporter, self).__init__() + self.collectors = {} + for key, value in collectors: + self.add_collector( + key, + value.limit, + value.callback, + value.url, + value.exchange, + value.postprocessor, + value.callback_args, + value.postprocessor_args + ) + + def add_collector(self, name, callback, rabbitmq_url, exchange, limit=250, interval=10, postprocessor=None, + exchange_type="direct", callback_args=[], postprocessor_args=[]): + """Add a new collector. + + Arguments: + name -- name of the new DataCollector + callback -- the data collection callback to run + + Keyword arguments: + limit -- the number of data points to store (default 100) + postprocessor -- a postprocessing function to run on each data point + (default None) + callback_args -- a list of arguments to pass to the callback + (default []) + postprocessor_args -- a list of arguments to pass to the postprocessor + (default []) + + Raises: + CollectorExistsException if a collector named name already exists + """ + if name in self.collectors: + raise CollectorExistsException + + self.collectors[name] = DataCollector( + name, + callback, + rabbitmq_url, + exchange, + limit=limit, + interval=interval, + postprocessor=postprocessor, + exchange_type=exchange_type, + callback_args=callback_args, + postprocessor_args=postprocessor_args + ) + + def start_collecting(self): + """ + Start data collection for all associated collectors. + """ + for collector in self.collectors: + self.start_collector(collector) + + def start_collector(self, name): + """ + Activate the specified collector. + + Arguments: + name -- the name of the collector to start + + Raises: + RuntimeError if the collector has already been started. + """ + try: + self.collectors[name].activate() + self.collectors[name].start() + except RuntimeError as e: + print("Error starting collector ", name) + print(e) + + def stop_collecting(self): + """ + Stop all collectors. + """ + for collector in self.collectors: + self.stop_collector(collector) + + def stop_collector(self, name): + """Deactivate the specified collector. + + Arguments: + name -- the name of the collector to stop + + Raises: + CollectorDoesNotExistException if no collector named name exists + """ + if name not in self.collectors: + raise CollectorDoesNotExistException + + try: + self.collectors[name].deactivate() + self.collectors[name].join() + except RuntimeError as e: # Catch deadlock + print(e) + + + def start_streaming(self, collector_name, routing_key): + """ + Begin streaming data from a collector to a particular recipient. + + Arguments: + routing_key -- the routing key to reach the intended recipient + """ + if collector_name not in self.collectors: # Make sure collector exists + raise CollectorDoesNotExistException + self.collectors[collector_name].add_routing_key(routing_key) + + def stop_streaming(self, collector_name, routing_key): + """ + Stop a particular stream. + + Arguments: + collector_name -- the collector associated with the producer to stop + routing_key -- the routing key to reach the intended recipient + + Raises: + ProducerDoesNotExistException if no producer named name exists + ValueError if the producer is removed by another call to this method + after the for loop begins + """ + pass http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/eventhandler.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/eventhandler.py b/modules/simstream/simstream/eventhandler.py new file mode 100755 index 0000000..9f4f3f2 --- /dev/null +++ b/modules/simstream/simstream/eventhandler.py @@ -0,0 +1,17 @@ +""" +A utility for responding to user-defined events. + +Author: Jeff Kinnison (jkinniso) +""" + + +class EventHandler(object): + """ + """ + def __init__(self, name, handler, handler_args=[]): + self.name = name + self._handler = handler + self._handler_args + + def __call__(self): + pass http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/eventmonitor.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/eventmonitor.py b/modules/simstream/simstream/eventmonitor.py new file mode 100755 index 0000000..d8c79f4 --- /dev/null +++ b/modules/simstream/simstream/eventmonitor.py @@ -0,0 +1,46 @@ +""" +Utility for monitoring collected data. + +Author: Jeff Kinnison (jkinn...@nd.edu) +""" + +# TODO: Add method to add handlers +# TODO: Add method to create PikaProducer +# TODO: Add method to use PikaProducer to respond to events +# TODO: Add method to deactivate monitor + + +class EventCheckerNotCallableException(Exception): + pass + + +class EventHandlerNotCallableException(Exception): + pass + + +class EventHandlerDoesNotExistException(Exception): + pass + + +class EventMonitor(object): + """Checks data for user-defined bounds violations. + + Instance variables: + handlers -- a dict of EventHandler objects indexed by name + """ + def __init__(self, event_check, handlers={}): + self._event_check = event_check + self.handlers = handlers + + def __call__(self, val): + if not callable(self._event_check): + raise EventCheckerNotCallableException + self._run_handler(self.event_check(val)) + + def _run_handler(self, handler_names): + for name in handler_names: + if name not in self.handlers: + raise EventHandlerDoesNotExistException + if not callable(self.handlers[name]): + raise EventHandlerNotCallableException + self.handlers[name]() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/pikaasyncconsumer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/pikaasyncconsumer.py b/modules/simstream/simstream/pikaasyncconsumer.py new file mode 100755 index 0000000..1c58687 --- /dev/null +++ b/modules/simstream/simstream/pikaasyncconsumer.py @@ -0,0 +1,203 @@ +""" +Streaming utility for system and simulation data. + +author: Jeff Kinnison (jkinn...@nd.edu) +""" + +import json +import pika + +class PikaAsyncConsumer(object): + """ + The primary entry point for routing incoming messages to the proper handler. + """ + + def __init__(self, rabbitmq_url, exchange_name, queue_name, message_handler, + exchange_type="direct", routing_key="#"): + """ + Create a new instance of Streamer. + + Arguments: + rabbitmq_url -- URL to RabbitMQ server + exchange_name -- name of RabbitMQ exchange to join + queue_name -- name of RabbitMQ queue to join + + Keyword Arguments: + exchange_type -- one of 'direct', 'topic', 'fanout', 'headers' + (default 'direct') + routing_keys -- the routing key that this consumer listens for + (default '#', receives all messages) + """ + self._connection = None + self._channel = None + self._shut_down = False + self._consumer_tag = None + self._url = rabbitmq_url + self._message_handler = message_handler + + # The following are necessary to guarantee that both the RabbitMQ + # server and Streamer know where to look for messages. These names will + # be decided before dispatch and should be recorded in a config file or + # else on a per-job basis. + self._exchange = exchange_name + self._exchange_type = exchange_type + self._queue = queue_name + self._routing_key = routing_key + + def connect(self): + """ + Create an asynchronous connection to the RabbitMQ server at URL. + """ + return pika.SelectConnection(pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_close_callback=self.on_connection_close, + stop_ioloop_on_close=False) + + def on_connection_open(self, unused_connection): + """ + Actions to perform when the connection opens. This may not happen + immediately, so defer action to this callback. + + Arguments: + unused_connection -- the created connection (by this point already + available as self._connection) + """ + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_connection_close(self, connection, code, text): + """ + Actions to perform when the connection is unexpectedly closed by the + RabbitMQ server. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + self._channel = None + if self._shut_down: + self._connection.ioloop.stop() + else: + self._connection.add_timeout(5, self.reconnect) + + def reconnect(self): + """ + Attempt to reestablish a connection with the RabbitMQ server. + """ + self._connection.ioloop.stop() # Stop the ioloop to completely close + + if not self._shut_down: # Connect and restart the ioloop + self._connection = self.connect() + self._connection.ioloop.start() + + def on_channel_open(self, channel): + """ + Store the opened channel for future use and set up the exchange and + queue to be used. + + Arguments: + channel -- the Channel instance opened by the Channel.Open RPC + """ + self._channel = channel + self._channel.add_on_close_callback(self.on_channel_close) + self.declare_exchange() + + + def on_channel_close(self, channel, code, text): + """ + Actions to perform when the channel is unexpectedly closed by the + RabbitMQ server. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + self._connection.close() + + def declare_exchange(self): + """ + Set up the exchange that will route messages to this consumer. Each + RabbitMQ exchange is uniquely identified by its name, so it does not + matter if the exchange has already been declared. + """ + self._channel.exchange_declare(self.declare_exchange_success, + self._exchange, + self._exchange_type) + + def declare_exchange_success(self, unused_connection): + """ + Actions to perform on successful exchange declaration. + """ + self.declare_queue() + + def declare_queue(self): + """ + Set up the queue that will route messages to this consumer. Each + RabbitMQ queue can be defined with routing keys to use only one + queue for multiple jobs. + """ + self._channel.queue_declare(self.declare_queue_success, + self._queue) + + def declare_queue_success(self, method_frame): + """ + Actions to perform on successful queue declaration. + """ + self._channel.queue_bind(self.munch, + self._queue, + self._exchange, + self._routing_key + ) + + def munch(self, unused): + """ + Begin consuming messages from the Airavata API server. + """ + self._channel.add_on_cancel_callback(self.cancel_channel) + self._consumer_tag = self._channel.basic_consume(self._process_message) + + def cancel_channel(self, method_frame): + if self._channel is not None: + self._channel._close() + + def _process_message(self, ch, method, properties, body): + """ + Receive and verify a message, then pass it to the router. + + Arguments: + ch -- the channel that routed the message + method -- delivery information + properties -- message properties + body -- the message + """ + print("Received Message: %s" % body) + self._message_handler(body) + #self._channel.basic_ack(delivery_tag=method.delivery_tag) + + def stop_consuming(self): + """ + Stop the consumer if active. + """ + if self._channel: + self._channel.basic_cancel(self.close_channel, self._consumer_tag) + + def close_channel(self): + """ + Close the channel to shut down the consumer and connection. + """ + self._channel.close() + + def start(self): + """ + Start a connection with the RabbitMQ server. + """ + self._connection = self.connect() + self._connection.ioloop.start() + + def stop(self): + """ + Stop an active connection with the RabbitMQ server. + """ + self._closing = True + self.stop_consuming()