diff --git a/aria/from_cloudify/workflows/ 
new file mode 100644
index 0000000..b8faa1b
--- /dev/null
+++ b/aria/from_cloudify/workflows/
@@ -0,0 +1,197 @@
+# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    * See the License for the specific language governing permissions and
+#    * limitations under the License.
+from cloudify import logs
+from cloudify.exceptions import OperationRetry
+from cloudify.workflows import tasks as tasks_api
+class Monitor(object):
+    """Monitor with handlers for different celery events"""
+    def __init__(self, tasks_graph):
+        """
+        :param tasks_graph: The task graph. Used to extract tasks based on the
+                            events task id.
+        """
+        self.tasks_graph = tasks_graph
+        self._receiver = None
+        self._should_stop = False
+    def task_sent(self, event):
+        pass
+    def task_received(self, event):
+        pass
+    def task_started(self, event):
+        self._handle(tasks_api.TASK_STARTED, event, send_event=True)
+    def task_succeeded(self, event):
+        self._handle(tasks_api.TASK_SUCCEEDED, event, send_event=True)
+    def task_failed(self, event):
+        if event.get('exception', '').startswith(OperationRetry.__name__):
+            state = tasks_api.TASK_RESCHEDULED
+        else:
+            state = tasks_api.TASK_FAILED
+        self._handle(state, event, send_event=False)
+    def task_revoked(self, event):
+        pass
+    def task_retried(self, event):
+        pass
+    def _handle(self, state, event, send_event):
+        task_id = event['uuid']
+        task = self.tasks_graph.get_task(task_id)
+        if task is not None:
+            if send_event:
+                send_task_event(state, task, send_task_event_func_remote,
+                                event)
+            task.set_state(state)
+    def capture(self):
+        # Only called when running within an agent, so import here
+        from import app
+        with app.connection() as connection:
+            self._receiver =, handlers={
+                'task-sent': self.task_sent,
+                'task-received': self.task_received,
+                'task-started': self.task_started,
+                'task-succeeded': self.task_succeeded,
+                'task-failed': self.task_failed,
+                'task-revoked': self.task_revoked,
+                'task-retried': self.task_retried
+            })
+            for _ in self._receiver.itercapture(limit=None,
+                                                timeout=None,
+                                                wakeup=True):
+                if self._should_stop:
+                    return
+    def stop(self):
+        self._should_stop = True
+        self._receiver.should_stop = True
+def send_task_event_func_remote(task, event_type, message,
+                                additional_context=None):
+    _send_task_event_func(task, event_type, message,
+                          out_func=logs.amqp_event_out,
+                          additional_context=additional_context)
+def send_task_event_func_local(task, event_type, message,
+                               additional_context=None):
+    _send_task_event_func(task, event_type, message,
+                          out_func=logs.stdout_event_out,
+                          additional_context=additional_context)
+def _send_task_event_func(task, event_type, message, out_func,
+                          additional_context):
+    if task.cloudify_context is None:
+        logs.send_workflow_event(ctx=task.workflow_context,
+                                 event_type=event_type,
+                                 message=message,
+                                 out_func=out_func,
+                                 additional_context=additional_context)
+    else:
+        logs.send_task_event(cloudify_context=task.cloudify_context,
+                             event_type=event_type,
+                             message=message,
+                             out_func=out_func,
+                             additional_context=additional_context)
+def _filter_task(task, state):
+    return state != tasks_api.TASK_FAILED and not task.send_task_events
+def send_task_event(state, task, send_event_func, event):
+    """
+    Send a task event delegating to 'send_event_func'
+    which will send events to RabbitMQ or use the workflow context logger
+    in local context
+    :param state: the task state (valid: ['sending', 'started', 'rescheduled',
+                  'succeeded', 'failed'])
+    :param task: a WorkflowTask instance to send the event for
+    :param send_event_func: function for actually sending the event somewhere
+    :param event: a dict with either a result field or an exception fields
+                  follows celery event structure but used by local tasks as
+                  well
+    """
+    if _filter_task(task, state):
+        return
+    if state in (tasks_api.TASK_FAILED, tasks_api.TASK_RESCHEDULED,
+                 tasks_api.TASK_SUCCEEDED) and event is None:
+        raise RuntimeError('Event for task {0} is None'.format(
+    if event and event.get('exception'):
+        exception_str = str(event.get('exception'))
+    else:
+        exception_str = None
+    if state == tasks_api.TASK_SENDING:
+        message = "Sending task '{0}'".format(
+        event_type = 'sending_task'
+    elif state == tasks_api.TASK_STARTED:
+        message = "Task started '{0}'".format(
+        event_type = 'task_started'
+    elif state == tasks_api.TASK_SUCCEEDED:
+        result = str(event.get('result'))
+        suffix = ' ({0})'.format(result) if result not in ("'None'",
+                                                           'None') else ''
+        message = "Task succeeded '{0}{1}'".format(, suffix)
+        event_type = 'task_succeeded'
+    elif state == tasks_api.TASK_RESCHEDULED:
+        message = "Task rescheduled '{0}'".format(
+        if exception_str:
+            message = '{0} -> {1}'.format(message, exception_str)
+        event_type = 'task_rescheduled'
+        task.error = exception_str
+    elif state == tasks_api.TASK_FAILED:
+        message = "Task failed '{0}'".format(
+        if exception_str:
+            message = "{0} -> {1}".format(message, exception_str)
+        event_type = 'task_failed'
+        task.error = exception_str
+    else:
+        raise RuntimeError('unhandled event type: {0}'.format(state))
+    if task.current_retries > 0:
+        retry = ' [retry {0}{1}]'.format(
+            task.current_retries,
+            '/{0}'.format(task.total_retries)
+            if task.total_retries >= 0 else '')
+        message = '{0}{1}'.format(message, retry)
+    additional_context = {
+        'task_current_retries': task.current_retries,
+        'task_total_retries': task.total_retries
+    }
+    if state in (tasks_api.TASK_FAILED, tasks_api.TASK_RESCHEDULED):
+        additional_context['task_error_causes'] = event.get('causes')
+    send_event_func(task=task,
+                    event_type=event_type,
+                    message=message,
+                    additional_context=additional_context)
diff --git a/aria/from_cloudify/workflows/ 
new file mode 100644
index 0000000..6293edc
--- /dev/null
+++ b/aria/from_cloudify/workflows/
@@ -0,0 +1,598 @@
+# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    * See the License for the specific language governing permissions and
+#    * limitations under the License.
+import os
+import tempfile
+import copy
+import importlib
+import shutil
+import uuid
+import json
+import threading
+from StringIO import StringIO
+from contextlib import contextmanager
+from cloudify_rest_client.nodes import Node
+from cloudify_rest_client.node_instances import NodeInstance
+from cloudify import dispatch
+from cloudify.workflows.workflow_context import (
+    from dsl_parser.constants import HOST_TYPE
+    from dsl_parser import parser as dsl_parser, tasks as dsl_tasks
+    from dsl_parser import functions as dsl_functions
+    _import_error = None
+except ImportError as e:
+    _import_error = str(e)
+    dsl_parser = None
+    dsl_tasks = None
+    dsl_functions = None
+    HOST_TYPE = None
+class _Environment(object):
+    def __init__(self,
+                 storage,
+                 blueprint_path=None,
+                 name='local',
+                 inputs=None,
+                 load_existing=False,
+                 ignored_modules=None,
+                 provider_context=None,
+                 resolver=None,
+                 validate_version=True):
+ = storage
+ = self
+        if load_existing:
+        else:
+            plan, nodes, node_instances = _parse_plan(blueprint_path,
+                                                      inputs,
+                                                      ignored_modules,
+                                                      resolver,
+                                                      validate_version)
+            storage.init(
+                name=name,
+                plan=plan,
+                nodes=nodes,
+                node_instances=node_instances,
+                blueprint_path=blueprint_path,
+                provider_context=provider_context)
+    @property
+    def plan(self):
+        return
+    @property
+    def name(self):
+        return
+    def outputs(self):
+        return dsl_functions.evaluate_outputs(
+            outputs_def=self.plan['outputs'],
+  ,
+  ,
+    def evaluate_functions(self, payload, context):
+        return dsl_functions.evaluate_functions(
+            payload=payload,
+            context=context,
+  ,
+  ,
+    def execute(self,
+                workflow,
+                parameters=None,
+                allow_custom_parameters=False,
+                task_retries=-1,
+                task_retry_interval=30,
+                subgraph_retries=0,
+                task_thread_pool_size=DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE):
+        workflows = self.plan['workflows']
+        workflow_name = workflow
+        if workflow_name not in workflows:
+            raise ValueError("'{0}' workflow does not exist. "
+                             "existing workflows are: {1}"
+                             .format(workflow_name,
+                                     workflows.keys()))
+        workflow = workflows[workflow_name]
+        execution_id = str(uuid.uuid4())
+        ctx = {
+            'type': 'workflow',
+            'local': True,
+            'deployment_id':,
+            'blueprint_id':,
+            'execution_id': execution_id,
+            'workflow_id': workflow_name,
+            'storage':,
+            'task_retries': task_retries,
+            'task_retry_interval': task_retry_interval,
+            'subgraph_retries': subgraph_retries,
+            'local_task_thread_pool_size': task_thread_pool_size,
+            'task_name': workflow['operation']
+        }
+        merged_parameters = _merge_and_validate_execution_parameters(
+            workflow, workflow_name, parameters, allow_custom_parameters)
+        return dispatch.dispatch(__cloudify_context=ctx, **merged_parameters)
+def init_env(blueprint_path,
+             name='local',
+             inputs=None,
+             storage=None,
+             ignored_modules=None,
+             provider_context=None,
+             resolver=None,
+             validate_version=True):
+    if storage is None:
+        storage = InMemoryStorage()
+    return _Environment(storage=storage,
+                        blueprint_path=blueprint_path,
+                        name=name,
+                        inputs=inputs,
+                        load_existing=False,
+                        ignored_modules=ignored_modules,
+                        provider_context=provider_context,
+                        resolver=resolver,
+                        validate_version=validate_version)
+def load_env(name, storage, resolver=None):
+    return _Environment(storage=storage,
+                        name=name,
+                        load_existing=True,
+                        resolver=resolver)
+def _parse_plan(blueprint_path, inputs, ignored_modules, resolver,
+                validate_version):
+    if dsl_parser is None:
+        raise ImportError('cloudify-dsl-parser must be installed to '
+                          'execute local workflows. '
+                          '(e.g. "pip install cloudify-dsl-parser") [{0}]'
+                          .format(_import_error))
+    plan = dsl_tasks.prepare_deployment_plan(
+        dsl_parser.parse_from_path(
+            dsl_file_path=blueprint_path,
+            resolver=resolver,
+            validate_version=validate_version),
+        inputs=inputs)
+    nodes = [Node(node) for node in plan['nodes']]
+    node_instances = [NodeInstance(instance)
+                      for instance in plan['node_instances']]
+    _prepare_nodes_and_instances(nodes, node_instances, ignored_modules)
+    return plan, nodes, node_instances
+def _validate_node(node):
+    if HOST_TYPE in node['type_hierarchy']:
+        install_agent_prop ='install_agent')
+        if install_agent_prop:
+            raise ValueError("'install_agent': true is not supported "
+                             "(it is True by default) "
+                             "when executing local workflows. "
+                             "The 'install_agent' property "
+                             "must be set to false for each node of type {0}."
+                             .format(HOST_TYPE))
+def _prepare_nodes_and_instances(nodes, node_instances, ignored_modules):
+    def scan(parent, name, node):
+        for operation in parent.get(name, {}).values():
+            if not operation['operation']:
+                continue
+            _get_module_method(operation['operation'],
+                               tpe=name,
+                     ,
+                               ignored_modules=ignored_modules)
+    for node in nodes:
+        scalable = node['capabilities']['scalable']['properties']
+        node.update(dict(
+            number_of_instances=scalable['current_instances'],
+            deploy_number_of_instances=scalable['default_instances'],
+            min_number_of_instances=scalable['min_instances'],
+            max_number_of_instances=scalable['max_instances'],
+        ))
+        if 'relationships' not in node:
+            node['relationships'] = []
+        scan(node, 'operations', node)
+        _validate_node(node)
+        for relationship in node['relationships']:
+            scan(relationship, 'source_operations', node)
+            scan(relationship, 'target_operations', node)
+    for node_instance in node_instances:
+        node_instance['version'] = 0
+        node_instance['runtime_properties'] = {}
+        node_instance['node_id'] = node_instance['name']
+        if 'relationships' not in node_instance:
+            node_instance['relationships'] = []
+def _get_module_method(module_method_path, tpe, node_name,
+                       ignored_modules=None):
+    ignored_modules = ignored_modules or []
+    split = module_method_path.split('.')
+    module_name = '.'.join(split[:-1])
+    if module_name in ignored_modules:
+        return None
+    method_name = split[-1]
+    try:
+        module = importlib.import_module(module_name)
+    except ImportError:
+        raise ImportError('mapping error: No module named {0} '
+                          '[node={1}, type={2}]'
+                          .format(module_name, node_name, tpe))
+    try:
+        return getattr(module, method_name)
+    except AttributeError:
+        raise AttributeError("mapping error: {0} has no attribute '{1}' "
+                             "[node={2}, type={3}]"
+                             .format(module.__name__, method_name,
+                                     node_name, tpe))
+def _try_convert_from_str(string, target_type):
+    if target_type == basestring:
+        return string
+    if target_type == bool:
+        if string.lower() == 'true':
+            return True
+        if string.lower() == 'false':
+            return False
+        return string
+    try:
+        return target_type(string)
+    except ValueError:
+        return string
+def _merge_and_validate_execution_parameters(
+        workflow, workflow_name, execution_parameters=None,
+        allow_custom_parameters=False):
+    merged_parameters = {}
+    workflow_parameters = workflow.get('parameters', {})
+    execution_parameters = execution_parameters or {}
+    missing_mandatory_parameters = set()
+    allowed_types = {
+        'integer': int,
+        'float': float,
+        'string': basestring,
+        'boolean': bool
+    }
+    wrong_types = {}
+    for name, param in workflow_parameters.iteritems():
+        if 'type' in param and name in execution_parameters:
+            # check if need to convert from string
+            if (isinstance(execution_parameters[name], basestring) and
+                    param['type'] in allowed_types):
+                execution_parameters[name] = \
+                    _try_convert_from_str(
+                        execution_parameters[name],
+                        allowed_types[param['type']])
+            # validate type
+            if not isinstance(execution_parameters[name],
+                              allowed_types.get(param['type'], object)):
+                wrong_types[name] = param['type']
+        if 'default' not in param:
+            if name not in execution_parameters:
+                missing_mandatory_parameters.add(name)
+                continue
+            merged_parameters[name] = execution_parameters[name]
+        else:
+            merged_parameters[name] = execution_parameters[name] if \
+                name in execution_parameters else param['default']
+    if missing_mandatory_parameters:
+        raise ValueError(
+            'Workflow "{0}" must be provided with the following '
+            'parameters to execute: {1}'
+            .format(workflow_name, ','.join(missing_mandatory_parameters)))
+    if wrong_types:
+        error_message = StringIO()
+        for param_name, param_type in wrong_types.iteritems():
+            error_message.write('Parameter "{0}" must be of type {1}\n'.
+                                format(param_name, param_type))
+        raise ValueError(error_message.getvalue())
+    custom_parameters = dict(
+        (k, v) for (k, v) in execution_parameters.iteritems()
+        if k not in workflow_parameters)
+    if not allow_custom_parameters and custom_parameters:
+        raise ValueError(
+            'Workflow "{0}" does not have the following parameters '
+            'declared: {1}. Remove these parameters or use '
+            'the flag for allowing custom parameters'
+            .format(workflow_name, ','.join(custom_parameters.keys())))
+    merged_parameters.update(custom_parameters)
+    return merged_parameters
+class _Storage(object):
+    def __init__(self):
+ = None
+        self.resources_root = None
+        self.plan = None
+        self._nodes = None
+        self._locks = None
+        self.env = None
+        self._provider_context = None
+    def init(self, name, plan, nodes, node_instances, blueprint_path,
+             provider_context):
+ = name
+        self.resources_root = os.path.dirname(os.path.abspath(blueprint_path))
+        self.plan = plan
+        self._provider_context = provider_context or {}
+        self._init_locks_and_nodes(nodes)
+    def _init_locks_and_nodes(self, nodes):
+        self._nodes = dict((, node) for node in nodes)
+        self._locks = dict((instance_id, threading.RLock()) for instance_id
+                           in self._instance_ids())
+    def load(self, name):
+        raise NotImplementedError()
+    def get_resource(self, resource_path):
+        with open(os.path.join(self.resources_root, resource_path)) as f:
+            return
+    def download_resource(self, resource_path, target_path=None):
+        if not target_path:
+            suffix = '-{0}'.format(os.path.basename(resource_path))
+            target_path = tempfile.mktemp(suffix=suffix)
+        resource = self.get_resource(resource_path)
+        with open(target_path, 'wb') as f:
+            f.write(resource)
+        return target_path
+    def update_node_instance(self,
+                             node_instance_id,
+                             version,
+                             runtime_properties=None,
+                             state=None):
+        with self._lock(node_instance_id):
+            instance = self._get_node_instance(node_instance_id)
+            if state is None and version != instance['version']:
+                raise StorageConflictError('version {0} does not match '
+                                           'current version of '
+                                           'node instance {1} which is {2}'
+                                           .format(version,
+                                                   node_instance_id,
+                                                   instance['version']))
+            else:
+                instance['version'] += 1
+            if runtime_properties is not None:
+                instance['runtime_properties'] = runtime_properties
+            if state is not None:
+                instance['state'] = state
+            self._store_instance(instance)
+    def _get_node_instance(self, node_instance_id):
+        instance = self._load_instance(node_instance_id)
+        if instance is None:
+            raise RuntimeError('Instance {0} does not exist'
+                               .format(node_instance_id))
+        return instance
+    def get_node(self, node_id):
+        node = self._nodes.get(node_id)
+        if node is None:
+            raise RuntimeError('Node {0} does not exist'
+                               .format(node_id))
+        return copy.deepcopy(node)
+    def get_nodes(self):
+        return copy.deepcopy(self._nodes.values())
+    def get_node_instance(self, node_instance_id):
+        return copy.deepcopy(self._get_node_instance(node_instance_id))
+    def get_provider_context(self):
+        return copy.deepcopy(self._provider_context)
+    def _load_instance(self, node_instance_id):
+        raise NotImplementedError()
+    def _store_instance(self, node_instance):
+        raise NotImplementedError()
+    def get_node_instances(self, node_id=None):
+        raise NotImplementedError()
+    def _instance_ids(self):
+        raise NotImplementedError()
+    def _lock(self, node_instance_id):
+        return self._locks[node_instance_id]
+    def get_workdir(self):
+        raise NotImplementedError()
+class InMemoryStorage(_Storage):
+    def __init__(self):
+        super(InMemoryStorage, self).__init__()
+        self._node_instances = None
+    def init(self, name, plan, nodes, node_instances, blueprint_path,
+             provider_context):
+        self.plan = plan
+        self._node_instances = dict((, instance)
+                                    for instance in node_instances)
+        super(InMemoryStorage, self).init(name, plan, nodes, node_instances,
+                                          blueprint_path, provider_context)
+    def load(self, name):
+        raise NotImplementedError('load is not implemented by memory storage')
+    def _load_instance(self, node_instance_id):
+        return self._node_instances.get(node_instance_id)
+    def _store_instance(self, node_instance):
+        pass
+    def get_node_instances(self, node_id=None):
+        instances = self._node_instances.values()
+        if node_id:
+            instances = [i for i in instances if i.node_id == node_id]
+        return copy.deepcopy(instances)
+    def _instance_ids(self):
+        return self._node_instances.keys()
+    def get_workdir(self):
+        raise NotImplementedError('get_workdir is not implemented by memory '
+                                  'storage')
+class FileStorage(_Storage):
+    def __init__(self, storage_dir='/tmp/cloudify-workflows'):
+        super(FileStorage, self).__init__()
+        self._root_storage_dir = os.path.join(storage_dir)
+        self._storage_dir = None
+        self._workdir = None
+        self._instances_dir = None
+        self._data_path = None
+        self._payload_path = None
+        self._blueprint_path = None
+    def init(self, name, plan, nodes, node_instances, blueprint_path,
+             provider_context):
+        storage_dir = os.path.join(self._root_storage_dir, name)
+        workdir = os.path.join(storage_dir, 'work')
+        instances_dir = os.path.join(storage_dir, 'node-instances')
+        data_path = os.path.join(storage_dir, 'data')
+        payload_path = os.path.join(storage_dir, 'payload')
+        os.makedirs(storage_dir)
+        os.mkdir(instances_dir)
+        os.mkdir(workdir)
+        with open(payload_path, 'w') as f:
+            f.write(json.dumps({}))
+        blueprint_filename = os.path.basename(os.path.abspath(blueprint_path))
+        with open(data_path, 'w') as f:
+            f.write(json.dumps({
+                'plan': plan,
+                'blueprint_filename': blueprint_filename,
+                'nodes': nodes,
+                'provider_context': provider_context or {}
+            }))
+        resources_root = os.path.dirname(os.path.abspath(blueprint_path))
+        self.resources_root = os.path.join(storage_dir, 'resources')
+        def ignore(src, names):
+            return names if os.path.abspath(self.resources_root) == src \
+                else set()
+        shutil.copytree(resources_root, self.resources_root, ignore=ignore)
+        self._instances_dir = instances_dir
+        for instance in node_instances:
+            self._store_instance(instance, lock=False)
+        self.load(name)
+    def load(self, name):
+ = name
+        self._storage_dir = os.path.join(self._root_storage_dir, name)
+        self._workdir = os.path.join(self._storage_dir, 'workdir')
+        self._instances_dir = os.path.join(self._storage_dir, 'node-instances')
+        self._payload_path = os.path.join(self._storage_dir, 'payload')
+        self._data_path = os.path.join(self._storage_dir, 'data')
+        with open(self._data_path) as f:
+            data = json.loads(
+        self.plan = data['plan']
+        self.resources_root = os.path.join(self._storage_dir, 'resources')
+        self._blueprint_path = os.path.join(self.resources_root,
+                                            data['blueprint_filename'])
+        self._provider_context = data.get('provider_context', {})
+        nodes = [Node(node) for node in data['nodes']]
+        self._init_locks_and_nodes(nodes)
+    @contextmanager
+    def payload(self):
+        with open(self._payload_path, 'r') as f:
+            payload = json.load(f)
+            yield payload
+        with open(self._payload_path, 'w') as f:
+            json.dump(payload, f, indent=2)
+            f.write(os.linesep)
+    def get_blueprint_path(self):
+        return self._blueprint_path
+    def get_node_instance(self, node_instance_id):
+        return self._get_node_instance(node_instance_id)
+    def _load_instance(self, node_instance_id):
+        with self._lock(node_instance_id):
+            with open(self._instance_path(node_instance_id)) as f:
+                return NodeInstance(json.loads(
+    def _store_instance(self, node_instance, lock=True):
+        instance_lock = None
+        if lock:
+            instance_lock = self._lock(
+            instance_lock.acquire()
+        try:
+            with open(self._instance_path(, 'w') as f:
+                f.write(json.dumps(node_instance))
+        finally:
+            if lock and instance_lock:
+                instance_lock.release()
+    def _instance_path(self, node_instance_id):
+        return os.path.join(self._instances_dir, node_instance_id)
+    def get_node_instances(self, node_id=None):
+        instances = [self._get_node_instance(instance_id)
+                     for instance_id in self._instance_ids()]
+        if node_id:
+            instances = [i for i in instances if i.node_id == node_id]
+        return instances
+    def _instance_ids(self):
+        return os.listdir(self._instances_dir)
+    def get_workdir(self):
+        return self._workdir
+class StorageConflictError(Exception):
+    pass
diff --git a/aria/from_cloudify/workflows/ 
new file mode 100644
index 0000000..bf676f1
--- /dev/null
+++ b/aria/from_cloudify/workflows/
@@ -0,0 +1,767 @@
+# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    * See the License for the specific language governing permissions and
+#    * limitations under the License.
+import sys
+import time
+import uuid
+import Queue
+from cloudify import utils
+from cloudify import exceptions
+from cloudify.workflows import api
+TASK_PENDING = 'pending'
+TASK_SENDING = 'sending'
+TASK_SENT = 'sent'
+TASK_STARTED = 'started'
+TASK_RESCHEDULED = 'rescheduled'
+TASK_SUCCEEDED = 'succeeded'
+TASK_FAILED = 'failed'
+DISPATCH_TASK = 'cloudify.dispatch.dispatch'
+def retry_failure_handler(task):
+    """Basic on_success/on_failure handler that always returns retry"""
+    return HandlerResult.retry()
+class WorkflowTask(object):
+    """A base class for workflow tasks"""
+    def __init__(self,
+                 workflow_context,
+                 task_id=None,
+                 info=None,
+                 on_success=None,
+                 on_failure=None,
+                 total_retries=DEFAULT_TOTAL_RETRIES,
+                 retry_interval=DEFAULT_RETRY_INTERVAL,
+                 send_task_events=DEFAULT_SEND_TASK_EVENTS):
+        """
+        :param task_id: The id of this task (generated if none is provided)
+        :param info: A short description of this task (for logging)
+        :param on_success: A handler called when the task's execution
+                           terminates successfully.
+                           Expected to return one of
+                           [HandlerResult.retry(), HandlerResult.cont()]
+                           to indicate whether this task should be re-executed.
+        :param on_failure: A handler called when the task's execution
+                           fails.
+                           Expected to return one of
+                           [HandlerResult.retry(), HandlerResult.ignore(),
+                  ]
+                           to indicate whether this task should be re-executed,
+                           cause the engine to terminate workflow execution
+                           immediately or simply ignore this task failure and
+                           move on.
+        :param total_retries: Maximum retry attempt for this task, in case
+                              the handlers return a retry attempt.
+        :param retry_interval: Number of seconds to wait between retries
+        :param workflow_context: the CloudifyWorkflowContext instance
+        """
+ = task_id or str(uuid.uuid4())
+        self._state = TASK_PENDING
+        self.async_result = None
+        self.on_success = on_success
+        self.on_failure = on_failure
+ = info
+        self.error = None
+        self.total_retries = total_retries
+        self.retry_interval = retry_interval
+        self.terminated = Queue.Queue(maxsize=1)
+        self.is_terminated = False
+        self.workflow_context = workflow_context
+        self.send_task_events = send_task_events
+        self.containing_subgraph = None
+        self.current_retries = 0
+        # timestamp for which the task should not be executed
+        # by the task graph before reached, overridden by the task
+        # graph during retries
+        self.execute_after = time.time()
+    def dump(self):
+        return {
+            'id':,
+            'state': self.get_state(),
+            'info':,
+            'error': self.error,
+            'current_retries': self.current_retries,
+            'cloudify_context': self.cloudify_context
+        }
+    def is_remote(self):
+        """
+        :return: Is this a remote task
+        """
+        return not self.is_local()
+    def is_local(self):
+        """
+        :return: Is this a local task
+        """
+        raise NotImplementedError('Implemented by subclasses')
+    def is_nop(self):
+        """
+        :return: Is this a NOP task
+        """
+        return False
+    def get_state(self):
+        """
+        Get the task state
+        :return: The task state [pending, sending, sent, started,
+                                 rescheduled, succeeded, failed]
+        """
+        return self._state
+    def set_state(self, state):
+        """
+        Set the task state
+        :param state: The state to set [pending, sending, sent, started,
+                                        rescheduled, succeeded, failed]
+        """
+            raise RuntimeError('Illegal state set on task: {0} '
+                               '[task={1}]'.format(state, str(self)))
+        self._state = state
+        if state in TERMINATED_STATES:
+            self.is_terminated = True
+            self.terminated.put_nowait(True)
+    def wait_for_terminated(self, timeout=None):
+        if self.is_terminated:
+            return
+        self.terminated.get(timeout=timeout)
+    def handle_task_terminated(self):
+        if self.get_state() in (TASK_FAILED, TASK_RESCHEDULED):
+            handler_result = self._handle_task_not_succeeded()
+        else:
+            handler_result = self._handle_task_succeeded()
+        if handler_result.action == HandlerResult.HANDLER_RETRY:
+            if any([self.total_retries == INFINITE_TOTAL_RETRIES,
+                    self.current_retries < self.total_retries,
+                    handler_result.ignore_total_retries]):
+                if handler_result.retry_after is None:
+                    handler_result.retry_after = self.retry_interval
+                if handler_result.retried_task is None:
+                    new_task = self.duplicate_for_retry(
+                        time.time() + handler_result.retry_after)
+                    handler_result.retried_task = new_task
+            else:
+                handler_result.action = HandlerResult.HANDLER_FAIL
+        if self.containing_subgraph:
+            subgraph = self.containing_subgraph
+            retried_task = None
+            if handler_result.action == HandlerResult.HANDLER_FAIL:
+                handler_result.action = HandlerResult.HANDLER_IGNORE
+                # It is possible that two concurrent tasks failed.
+                # we will only consider the first one handled
+                if not subgraph.failed_task:
+                    subgraph.failed_task = self
+                    subgraph.set_state(TASK_FAILED)
+            elif handler_result.action == HandlerResult.HANDLER_RETRY:
+                retried_task = handler_result.retried_task
+            subgraph.task_terminated(task=self, new_task=retried_task)
+        return handler_result
+    def _handle_task_succeeded(self):
+        """Call handler for task success"""
+        if self.on_success:
+            return self.on_success(self)
+        else:
+            return HandlerResult.cont()
+    def _handle_task_not_succeeded(self):
+        """
+        Call handler for task which hasn't ended in 'succeeded' state
+        (i.e. has either failed or been rescheduled)
+        """
+        try:
+            exception = self.async_result.result
+        except Exception as e:
+            exception = exceptions.NonRecoverableError(
+                'Could not de-serialize '
+                'exception of task {0} --> {1}: {2}'
+                .format(,
+                        type(e).__name__,
+                        str(e)))
+        if isinstance(exception, exceptions.OperationRetry):
+            # operation explicitly requested a retry, so we ignore
+            # the handler set on the task.
+            handler_result = HandlerResult.retry()
+        elif self.on_failure:
+            handler_result = self.on_failure(self)
+        else:
+            handler_result = HandlerResult.retry()
+        if handler_result.action == HandlerResult.HANDLER_RETRY:
+            if isinstance(exception, exceptions.NonRecoverableError):
+                handler_result =
+            elif isinstance(exception, exceptions.RecoverableError):
+                handler_result.retry_after = exception.retry_after
+        if not self.is_subgraph:
+            causes = []
+            if isinstance(exception, (exceptions.RecoverableError,
+                                      exceptions.NonRecoverableError)):
+                causes = exception.causes or []
+            if isinstance(self, LocalWorkflowTask):
+                tb = self.async_result._holder.error[1]
+                causes.append(utils.exception_to_error_cause(exception, tb))
+            self.workflow_context.internal.send_task_event(
+                state=self.get_state(),
+                task=self,
+                event={'exception': exception, 'causes': causes})
+        return handler_result
+    def __str__(self):
+        suffix = if is not None else ''
+        return '{0}({1})'.format(, suffix)
+    def duplicate_for_retry(self, execute_after):
+        """
+        :return: A new instance of this task with a new task id
+        """
+        dup = self._duplicate()
+        dup.execute_after = execute_after
+        dup.current_retries = self.current_retries + 1
+        if dup.cloudify_context and 'operation' in dup.cloudify_context:
+            op_ctx = dup.cloudify_context['operation']
+            op_ctx['retry_number'] = dup.current_retries
+        return dup
+    def _duplicate(self):
+        raise NotImplementedError('Implemented by subclasses')
+    @property
+    def cloudify_context(self):
+        raise NotImplementedError('Implemented by subclasses')
+    @property
+    def name(self):
+        """
+        :return: The task name
+        """
+        raise NotImplementedError('Implemented by subclasses')
+    @property
+    def is_subgraph(self):
+        return False
+class RemoteWorkflowTask(WorkflowTask):
+    """A WorkflowTask wrapping a celery based task"""
+    # cache for registered tasks queries to celery workers
+    cache = {}
+    def __init__(self,
+                 kwargs,
+                 cloudify_context,
+                 workflow_context,
+                 task_queue=None,
+                 task_target=None,
+                 task_id=None,
+                 info=None,
+                 on_success=None,
+                 on_failure=retry_failure_handler,
+                 total_retries=DEFAULT_TOTAL_RETRIES,
+                 retry_interval=DEFAULT_RETRY_INTERVAL,
+                 send_task_events=DEFAULT_SEND_TASK_EVENTS):
+        """
+        :param kwargs: The keyword argument this task will be invoked with
+        :param cloudify_context: the cloudify context dict
+        :param task_queue: the cloudify context dict
+        :param task_target: the cloudify context dict
+        :param task_id: The id of this task (generated if none is provided)
+        :param info: A short description of this task (for logging)
+        :param on_success: A handler called when the task's execution
+                           terminates successfully.
+                           Expected to return one of
+                           [HandlerResult.retry(), HandlerResult.cont()]
+                           to indicate whether this task should be re-executed.
+        :param on_failure: A handler called when the task's execution
+                           fails.
+                           Expected to return one of
+                           [HandlerResult.retry(), HandlerResult.ignore(),
+                  ]
+                           to indicate whether this task should be re-executed,
+                           cause the engine to terminate workflow execution
+                           immediately or simply ignore this task failure and
+                           move on.
+        :param total_retries: Maximum retry attempt for this task, in case
+                              the handlers return a retry attempt.
+        :param retry_interval: Number of seconds to wait between retries
+        :param workflow_context: the CloudifyWorkflowContext instance
+        """
+        super(RemoteWorkflowTask, self).__init__(
+            workflow_context,
+            task_id,
+            info=info,
+            on_success=on_success,
+            on_failure=on_failure,
+            total_retries=total_retries,
+            retry_interval=retry_interval,
+            send_task_events=send_task_events)
+        self._task_target = task_target
+        self._task_queue = task_queue
+        self._kwargs = kwargs
+        self._cloudify_context = cloudify_context
+    def apply_async(self):
+        """
+        Call the underlying celery tasks apply_async. Verify the worker
+        is alive and send an event before doing so.
+        :return: a RemoteWorkflowTaskResult instance wrapping the
+                 celery async result
+        """
+        try:
+            task, self._task_queue, self._task_target = \
+                self.workflow_context.internal.handler.get_task(
+                    self, queue=self._task_queue, target=self._task_target)
+            self._verify_worker_alive()
+            self.workflow_context.internal.send_task_event(TASK_SENDING, self)
+            self.set_state(TASK_SENT)
+            async_result = task.apply_async(
+            self.async_result = RemoteWorkflowTaskResult(self, async_result)
+        except (exceptions.NonRecoverableError,
+                exceptions.RecoverableError) as e:
+            self.set_state(TASK_FAILED)
+            self.async_result = RemoteWorkflowErrorTaskResult(self, e)
+        return self.async_result
+    def is_local(self):
+        return False
+    def _duplicate(self):
+        dup = RemoteWorkflowTask(kwargs=self._kwargs,
+                                 task_queue=self.queue,
+                       ,
+                                 cloudify_context=self.cloudify_context,
+                                 workflow_context=self.workflow_context,
+                                 task_id=None,  # we want a new task id
+                       ,
+                                 on_success=self.on_success,
+                                 on_failure=self.on_failure,
+                                 total_retries=self.total_retries,
+                                 retry_interval=self.retry_interval,
+                                 send_task_events=self.send_task_events)
+        dup.cloudify_context['task_id'] =
+        return dup
+    @property
+    def name(self):
+        """The task name"""
+        return self.cloudify_context['task_name']
+    @property
+    def cloudify_context(self):
+        return self._cloudify_context
+    @property
+    def target(self):
+        """The task target (worker name)"""
+        return self._task_target
+    @property
+    def queue(self):
+        """The task queue"""
+        return self._task_queue
+    @property
+    def kwargs(self):
+        """kwargs to pass when invoking the task"""
+        return self._kwargs
+    def _verify_worker_alive(self):
+        verify_worker_alive(,
+                  ,
+                            self._get_registered)
+    def _get_registered(self):
+        # import here because this only applies in remote execution
+        # environments
+        from import app
+        worker_name = 'celery@{0}'.format(
+        inspect = app.control.inspect(destination=[worker_name],
+                                      timeout=INSPECT_TIMEOUT)
+        registered = inspect.registered()
+        if registered is None or worker_name not in registered:
+            return None
+        return set(registered[worker_name])
+class LocalWorkflowTask(WorkflowTask):
+    """A WorkflowTask wrapping a local callable"""
+    def __init__(self,
+                 local_task,
+                 workflow_context,
+                 node=None,
+                 info=None,
+                 on_success=None,
+                 on_failure=retry_failure_handler,
+                 total_retries=DEFAULT_TOTAL_RETRIES,
+                 retry_interval=DEFAULT_RETRY_INTERVAL,
+                 send_task_events=DEFAULT_SEND_TASK_EVENTS,
+                 kwargs=None,
+                 task_id=None,
+                 name=None):
+        """
+        :param local_task: A callable
+        :param workflow_context: the CloudifyWorkflowContext instance
+        :param node: The CloudifyWorkflowNode instance (if in node context)
+        :param info: A short description of this task (for logging)
+        :param on_success: A handler called when the task's execution
+                           terminates successfully.
+                           Expected to return one of
+                           [HandlerResult.retry(), HandlerResult.cont()]
+                           to indicate whether this task should be re-executed.
+        :param on_failure: A handler called when the task's execution
+                           fails.
+                           Expected to return one of
+                           [HandlerResult.retry(), HandlerResult.ignore(),
+                  ]
+                           to indicate whether this task should be re-executed,
+                           cause the engine to terminate workflow execution
+                           immediately or simply ignore this task failure and
+                           move on.
+        :param total_retries: Maximum retry attempt for this task, in case
+                              the handlers return a retry attempt.
+        :param retry_interval: Number of seconds to wait between retries
+        :param kwargs: Local task keyword arguments
+        :param name: optional parameter (default: local_task.__name__)
+        """
+        super(LocalWorkflowTask, self).__init__(
+            info=info,
+            on_success=on_success,
+            on_failure=on_failure,
+            total_retries=total_retries,
+            retry_interval=retry_interval,
+            task_id=task_id,
+            workflow_context=workflow_context,
+            send_task_events=send_task_events)
+        self.local_task = local_task
+        self.node = node
+        self.kwargs = kwargs or {}
+        self._name = name or local_task.__name__
+    def dump(self):
+        super_dump = super(LocalWorkflowTask, self).dump()
+        super_dump.update({
+            'name': self._name
+        })
+        return super_dump
+    def apply_async(self):
+        """
+        Execute the task in the local task thread pool
+        :return: A wrapper for the task result
+        """
+        def local_task_wrapper():
+            try:
+                self.workflow_context.internal.send_task_event(TASK_STARTED,
+                                                               self)
+                result = self.local_task(**self.kwargs)
+                self.workflow_context.internal.send_task_event(
+                    TASK_SUCCEEDED, self, event={'result': str(result)})
+                self.async_result._holder.result = result
+                self.set_state(TASK_SUCCEEDED)
+            except BaseException as e:
+                new_task_state = TASK_RESCHEDULED if isinstance(
+                    e, exceptions.OperationRetry) else TASK_FAILED
+                exc_type, exception, tb = sys.exc_info()
+                self.async_result._holder.error = (exception, tb)
+                self.set_state(new_task_state)
+        self.async_result = LocalWorkflowTaskResult(self)
+        self.workflow_context.internal.send_task_event(TASK_SENDING, self)
+        self.set_state(TASK_SENT)
+        self.workflow_context.internal.add_local_task(local_task_wrapper)
+        return self.async_result
+    def is_local(self):
+        return True
+    def _duplicate(self):
+        dup = LocalWorkflowTask(local_task=self.local_task,
+                                workflow_context=self.workflow_context,
+                                node=self.node,
+                      ,
+                                on_success=self.on_success,
+                                on_failure=self.on_failure,
+                                total_retries=self.total_retries,
+                                retry_interval=self.retry_interval,
+                                send_task_events=self.send_task_events,
+                                kwargs=self.kwargs,
+        return dup
+    @property
+    def name(self):
+        """The task name"""
+        return self._name
+    @property
+    def cloudify_context(self):
+        return self.kwargs.get('__cloudify_context')
+# NOP tasks class
+class NOPLocalWorkflowTask(LocalWorkflowTask):
+    def __init__(self, workflow_context):
+        super(NOPLocalWorkflowTask, self).__init__(lambda: None,
+                                                   workflow_context)
+    @property
+    def name(self):
+        """The task name"""
+        return 'NOP'
+    def apply_async(self):
+        self.set_state(TASK_SUCCEEDED)
+        return LocalWorkflowTaskResult(self)
+    def is_nop(self):
+        return True
+class WorkflowTaskResult(object):
+    """A base wrapper for workflow task results"""
+    def __init__(self, task):
+        self.task = task
+    def _process(self, retry_on_failure):
+        if self.task.workflow_context.internal.graph_mode:
+            return self._get()
+        task_graph = self.task.workflow_context.internal.task_graph
+        while True:
+            self._wait_for_task_terminated()
+            handler_result = self.task.handle_task_terminated()
+            task_graph.remove_task(self.task)
+            try:
+                result = self._get()
+                if handler_result.action != HandlerResult.HANDLER_RETRY:
+                    return result
+            except:
+                if (not retry_on_failure or
+                        handler_result.action == HandlerResult.HANDLER_FAIL):
+                    raise
+            self._sleep(handler_result.retry_after)
+            self.task = handler_result.retried_task
+            task_graph.add_task(self.task)
+            self._check_execution_cancelled()
+            self.task.apply_async()
+            self._refresh_state()
+    @staticmethod
+    def _check_execution_cancelled():
+        if api.has_cancel_request():
+            raise api.ExecutionCancelled()
+    def _wait_for_task_terminated(self):
+        while True:
+            self._check_execution_cancelled()
+            try:
+                self.task.wait_for_terminated(timeout=1)
+                break
+            except Queue.Empty:
+                continue
+    def _sleep(self, seconds):
+        while seconds > 0:
+            self._check_execution_cancelled()
+            sleep_time = 1 if seconds > 1 else seconds
+            time.sleep(sleep_time)
+            seconds -= sleep_time
+    def get(self, retry_on_failure=True):
+        """
+        Get the task result.
+        Will block until the task execution ends.
+        :return: The task result
+        """
+        return self._process(retry_on_failure)
+    def _get(self):
+        raise NotImplementedError('Implemented by subclasses')
+    def _refresh_state(self):
+        raise NotImplementedError('Implemented by subclasses')
+class RemoteWorkflowErrorTaskResult(WorkflowTaskResult):
+    def __init__(self, task, exception):
+        super(RemoteWorkflowErrorTaskResult, self).__init__(task)
+        self.exception = exception
+    def _get(self):
+        raise self.exception
+    @property
+    def result(self):
+        return self.exception
+class RemoteWorkflowTaskResult(WorkflowTaskResult):
+    """A wrapper for celery's AsyncResult"""
+    def __init__(self, task, async_result):
+        super(RemoteWorkflowTaskResult, self).__init__(task)
+        self.async_result = async_result
+    def _get(self):
+        return self.async_result.get()
+    def _refresh_state(self):
+        self.async_result = self.task.async_result.async_result
+    @property
+    def result(self):
+        return self.async_result.result
+class LocalWorkflowTaskResult(WorkflowTaskResult):
+    """A wrapper for local workflow task results"""
+    class ResultHolder(object):
+        def __init__(self, result=None, error=None):
+            self.result = result
+            self.error = error
+    def __init__(self, task):
+        """
+        :param task: The LocalWorkflowTask instance
+        """
+        super(LocalWorkflowTaskResult, self).__init__(task)
+        self._holder = self.ResultHolder()
+    def _get(self):
+        if self._holder.error is not None:
+            exception, traceback = self._holder.error
+            raise exception, None, traceback
+        return self._holder.result
+    def _refresh_state(self):
+        self._holder = self.task.async_result._holder
+    @property
+    def result(self):
+        if self._holder.error:
+            return self._holder.error[0]
+        else:
+            return self._holder.result
+class StubAsyncResult(object):
+    """Stub async result that always returns None"""
+    result = None
+class HandlerResult(object):
+    HANDLER_RETRY = 'handler_retry'
+    HANDLER_FAIL = 'handler_fail'
+    HANDLER_IGNORE = 'handler_ignore'
+    HANDLER_CONTINUE = 'handler_continue'
+    def __init__(self,
+                 action,
+                 ignore_total_retries=False,
+                 retry_after=None):
+        self.action = action
+        self.ignore_total_retries = ignore_total_retries
+        self.retry_after = retry_after
+        # this field is filled by handle_terminated_task() below after
+        # duplicating the task and updating the relevant task fields
+        # or by a subgraph on_XXX handler
+        self.retried_task = None
+    @classmethod
+    def retry(cls, ignore_total_retries=False, retry_after=None):
+        return HandlerResult(cls.HANDLER_RETRY,
+                             ignore_total_retries=ignore_total_retries,
+                             retry_after=retry_after)
+    @classmethod
+    def fail(cls):
+        return HandlerResult(cls.HANDLER_FAIL)
+    @classmethod
+    def cont(cls):
+        return HandlerResult(cls.HANDLER_CONTINUE)
+    @classmethod
+    def ignore(cls):
+        return HandlerResult(cls.HANDLER_IGNORE)
+def verify_worker_alive(name, target, get_registered):
+    cache = RemoteWorkflowTask.cache
+    registered = cache.get(target)
+    if not registered:
+        registered = get_registered()
+        cache[target] = registered
+    if registered is None:
+        raise exceptions.RecoverableError(
+            'Timed out querying worker celery@{0} for its registered '
+            'tasks. [timeout={1} seconds]'.format(target, INSPECT_TIMEOUT))
+    if DISPATCH_TASK not in registered:
+        raise exceptions.NonRecoverableError(
+            'Missing {0} task in worker {1} \n'
+            'Registered tasks are: {2}. (This probably means the agent '
+            'configuration is invalid) [{3}]'.format(
+                DISPATCH_TASK, target, registered, name))
diff --git a/aria/from_cloudify/workflows/ 
new file mode 100644
index 0000000..31e5635
--- /dev/null
+++ b/aria/from_cloudify/workflows/
@@ -0,0 +1,372 @@
+# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    * See the License for the specific language governing permissions and
+#    * limitations under the License.
+import os
+import json
+import time
+import networkx as nx
+from cloudify.workflows import api
+from cloudify.workflows import tasks
+class TaskDependencyGraph(object):
+    """
+    A task graph builder
+    :param workflow_context: A WorkflowContext instance (used for logging)
+    """
+    def __init__(self, workflow_context,
+                 default_subgraph_task_config=None):
+        self.ctx = workflow_context
+        self.graph = nx.DiGraph()
+        default_subgraph_task_config = default_subgraph_task_config or {}
+        self._default_subgraph_task_config = default_subgraph_task_config
+    def add_task(self, task):
+        """Add a WorkflowTask to this graph
+        :param task: The task
+        """
+        self.graph.add_node(, task=task)
+    def get_task(self, task_id):
+        """Get a task instance that was inserted to this graph by its id
+        :param task_id: the task id
+        :return: a WorkflowTask instance for the requested task if found.
+                 None, otherwise.
+        """
+        data = self.graph.node.get(task_id)
+        return data['task'] if data is not None else None
+    def remove_task(self, task):
+        """Remove the provided task from the graph
+        :param task: The task
+        """
+        if task.is_subgraph:
+            for subgraph_task in task.tasks.values():
+                self.remove_task(subgraph_task)
+        if in self.graph:
+            self.graph.remove_node(
+    # src depends on dst
+    def add_dependency(self, src_task, dst_task):
+        """
+        Add a dependency between tasks.
+        The source task will only be executed after the target task terminates.
+        A task may depend on several tasks, in which case it will only be
+        executed after all its 'destination' tasks terminate
+        :param src_task: The source task
+        :param dst_task: The target task
+        """
+        if not self.graph.has_node(
+            raise RuntimeError('source task {0} is not in graph (task id: '
+                               '{1})'.format(src_task,
+        if not self.graph.has_node(
+            raise RuntimeError('destination task {0} is not in graph (task '
+                               'id: {1})'.format(dst_task,
+        self.graph.add_edge(,
+    def sequence(self):
+        """
+        :return: a new TaskSequence for this graph
+        """
+        return TaskSequence(self)
+    def subgraph(self, name):
+        task = SubgraphTask(name, self, **self._default_subgraph_task_config)
+        self.add_task(task)
+        return task
+    def execute(self):
+        """
+        Start executing the graph based on tasks and dependencies between
+        them.
+        Calling this method will block until one of the following occurs:
+            1. all tasks terminated
+            2. a task failed
+            3. an unhandled exception is raised
+            4. the execution is cancelled
+        Note: This method will raise an api.ExecutionCancelled error if the
+        execution has been cancelled. When catching errors raised from this
+        method, make sure to re-raise the error if it's
+        api.ExecutionsCancelled in order to allow the execution to be set in
+        cancelled mode properly.
+        Also note that for the time being, if such a cancelling event
+        occurs, the method might return even while there's some operations
+        still being executed.
+        """
+        while True:
+            if self._is_execution_cancelled():
+                raise api.ExecutionCancelled()
+            self._check_dump_request()
+            # handle all terminated tasks
+            # it is important this happens before handling
+            # executable tasks so we get to make tasks executable
+            # and then execute them in this iteration (otherwise, it would
+            # be the next one)
+            for task in self._terminated_tasks():
+                self._handle_terminated_task(task)
+            # handle all executable tasks
+            for task in self._executable_tasks():
+                self._handle_executable_task(task)
+            # no more tasks to process, time to move on
+            if len(self.graph.node) == 0:
+                return
+            # sleep some and do it all over again
+            else:
+                time.sleep(0.1)
+    @staticmethod
+    def _is_execution_cancelled():
+        return api.has_cancel_request()
+    def _executable_tasks(self):
+        """
+        A task is executable if it is in pending state
+        , it has no dependencies at the moment (i.e. all of its dependencies
+        already terminated) and its execution timestamp is smaller then the
+        current timestamp
+        :return: An iterator for executable tasks
+        """
+        now = time.time()
+        return (task for task in self.tasks_iter()
+                if task.get_state() == tasks.TASK_PENDING and
+                task.execute_after <= now and
+                not (task.containing_subgraph and
+                     task.containing_subgraph.get_state() ==
+                     tasks.TASK_FAILED) and
+                not self._task_has_dependencies(task))
+    def _terminated_tasks(self):
+        """
+        A task is terminated if it is in 'succeeded' or 'failed' state
+        :return: An iterator for terminated tasks
+        """
+        return (task for task in self.tasks_iter()
+                if task.get_state() in tasks.TERMINATED_STATES)
+    def _task_has_dependencies(self, task):
+        """
+        :param task: The task
+        :return: Does this task have any dependencies
+        """
+        return (len(self.graph.succ.get(, {})) > 0 or
+                (task.containing_subgraph and self._task_has_dependencies(
+                    task.containing_subgraph)))
+    def tasks_iter(self):
+        """
+        An iterator on tasks added to the graph
+        """
+        return (data['task'] for _, data in self.graph.nodes_iter(data=True))
+    def _handle_executable_task(self, task):
+        """Handle executable task"""
+        task.set_state(tasks.TASK_SENDING)
+        task.apply_async()
+    def _handle_terminated_task(self, task):
+        """Handle terminated task"""
+        handler_result = task.handle_task_terminated()
+        if handler_result.action == tasks.HandlerResult.HANDLER_FAIL:
+            if isinstance(task, SubgraphTask) and task.failed_task:
+                task = task.failed_task
+            message = "Workflow failed: Task failed '{0}'".format(
+            if task.error:
+                message = '{0} -> {1}'.format(message, task.error)
+            raise RuntimeError(message)
+        dependents = self.graph.predecessors(
+        removed_edges = [(dependent,
+                         for dependent in dependents]
+        self.graph.remove_edges_from(removed_edges)
+        self.graph.remove_node(
+        if handler_result.action == tasks.HandlerResult.HANDLER_RETRY:
+            new_task = handler_result.retried_task
+            self.add_task(new_task)
+            added_edges = [(dependent,
+                           for dependent in dependents]
+            self.graph.add_edges_from(added_edges)
+    def _check_dump_request(self):
+        task_dump = os.environ.get('WORKFLOW_TASK_DUMP')
+        if not (task_dump and os.path.exists(task_dump)):
+            return
+        os.remove(task_dump)
+        task_dump_path = '{0}.{1}'.format(task_dump, time.time())
+        with open(task_dump_path, 'w') as f:
+            f.write(json.dumps({
+                'tasks': [task.dump() for task in self.tasks_iter()],
+                'edges': [[s, t] for s, t in self.graph.edges_iter()]}))
+class forkjoin(object):
+    """
+    A simple wrapper for tasks. Used in conjunction with TaskSequence.
+    Defined to make the code easier to read (instead of passing a list)
+    see ``TaskSequence.add`` for more details
+    """
+    def __init__(self, *tasks):
+        self.tasks = tasks
+class TaskSequence(object):
+    """
+    Helper class to add tasks in a sequential manner to a task dependency
+    graph
+    :param graph: The TaskDependencyGraph instance
+    """
+    def __init__(self, graph):
+        self.graph = graph
+        self.last_fork_join_tasks = None
+    def add(self, *tasks):
+        """
+        Add tasks to the sequence.
+        :param tasks: Each task might be:
+                      * A WorkflowTask instance, in which case, it will be
+                        added to the graph with a dependency between it and
+                        the task previously inserted into the sequence
+                      * A forkjoin of tasks, in which case it will be treated
+                        as a "fork-join" task in the sequence, i.e. all the
+                        fork-join tasks will depend on the last task in the
+                        sequence (could be fork join) and the next added task
+                        will depend on all tasks in this fork-join task
+        """
+        for fork_join_tasks in tasks:
+            if isinstance(fork_join_tasks, forkjoin):
+                fork_join_tasks = fork_join_tasks.tasks
+            else:
+                fork_join_tasks = [fork_join_tasks]
+            for task in fork_join_tasks:
+                self.graph.add_task(task)
+                if self.last_fork_join_tasks is not None:
+                    for last_fork_join_task in self.last_fork_join_tasks:
+                        self.graph.add_dependency(task, last_fork_join_task)
+            if fork_join_tasks:
+                self.last_fork_join_tasks = fork_join_tasks
+class SubgraphTask(tasks.WorkflowTask):
+    def __init__(self,
+                 name,
+                 graph,
+                 task_id=None,
+                 on_success=None,
+                 on_failure=None,
+                 total_retries=tasks.DEFAULT_SUBGRAPH_TOTAL_RETRIES,
+                 retry_interval=tasks.DEFAULT_RETRY_INTERVAL,
+                 send_task_events=tasks.DEFAULT_SEND_TASK_EVENTS):
+        super(SubgraphTask, self).__init__(
+            graph.ctx,
+            task_id,
+            info=name,
+            on_success=on_success,
+            on_failure=on_failure,
+            total_retries=total_retries,
+            retry_interval=retry_interval,
+            send_task_events=send_task_events)
+        self.graph = graph
+        self._name = name
+        self.tasks = {}
+        self.failed_task = None
+        if not self.on_failure:
+            self.on_failure = lambda tsk:
+        self.async_result = tasks.StubAsyncResult()
+    def _duplicate(self):
+        raise NotImplementedError('self.retried_task should be set explicitly'
+                                  ' in self.on_failure handler')
+    @property
+    def cloudify_context(self):
+        return {}
+    def is_local(self):
+        return True
+    @property
+    def name(self):
+        return self._name
+    @property
+    def is_subgraph(self):
+        return True
+    def sequence(self):
+        return TaskSequence(self)
+    def subgraph(self, name):
+        task = SubgraphTask(name, self.graph,
+                            **self.graph._default_subgraph_task_config)
+        self.add_task(task)
+        return task
+    def add_task(self, task):
+        self.graph.add_task(task)
+        self.tasks[] = task
+        if task.containing_subgraph and task.containing_subgraph is not self:
+            raise RuntimeError('task {0}[{1}] cannot be contained in more '
+                               'than one subgraph. It is currently contained '
+                               'in {2} and it is now being added to {3}'
+                               .format(task,
+                             ,
+                             ,
+        task.containing_subgraph = self
+    def remove_task(self, task):
+        self.graph.remove_task(task)
+    def add_dependency(self, src_task, dst_task):
+        self.graph.add_dependency(src_task, dst_task)
+    def apply_async(self):
+        if not self.tasks:
+            self.set_state(tasks.TASK_SUCCEEDED)
+        else:
+            self.set_state(tasks.TASK_STARTED)
+    def task_terminated(self, task, new_task=None):
+        del self.tasks[]
+        if new_task:
+            self.tasks[] = new_task
+            new_task.containing_subgraph = self
+        if not self.tasks and self.get_state() not in tasks.TERMINATED_STATES:
+            self.set_state(tasks.TASK_SUCCEEDED)
diff --git a/aria/from_cloudify/workflows/ 
new file mode 100644
index 0000000..dc60f18
--- /dev/null
+++ b/aria/from_cloudify/workflows/
@@ -0,0 +1,47 @@
+# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    * See the License for the specific language governing permissions and
+#    * limitations under the License.
+EXECUTION_CANCELLED_RESULT = 'execution_cancelled'
+cancel_request = False
+def has_cancel_request():
+    """
+    Checks for requests to cancel the workflow execution.
+    This should be used to allow graceful termination of workflow executions.
+    If this method is not used and acted upon, a simple 'cancel'
+    request for the execution will have no effect - 'force-cancel' will have
+    to be used to abruptly terminate the execution instead.
+    Note: When this method returns True, the workflow should make the
+    appropriate cleanups and then it must raise an ExecutionCancelled error
+    if the execution indeed gets cancelled (i.e. if it's too late to cancel
+    there is no need to raise this exception and the workflow should end
+    normally).
+    :return: whether there was a request to cancel the workflow execution
+    """
+    return cancel_request
+class ExecutionCancelled(Exception):
+    """
+    This exception should be raised when a workflow has been cancelled,
+    once appropriate cleanups have taken place.
+    """
+    pass

Reply via email to