http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/from_cloudify/workflows/events.py ---------------------------------------------------------------------- diff --git a/aria/from_cloudify/workflows/events.py b/aria/from_cloudify/workflows/events.py new file mode 100644 index 0000000..b8faa1b --- /dev/null +++ b/aria/from_cloudify/workflows/events.py @@ -0,0 +1,197 @@ +######## +# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. + + +from cloudify import logs +from cloudify.exceptions import OperationRetry +from cloudify.workflows import tasks as tasks_api + + +class Monitor(object): + """Monitor with handlers for different celery events""" + + def __init__(self, tasks_graph): + """ + :param tasks_graph: The task graph. Used to extract tasks based on the + events task id. + """ + self.tasks_graph = tasks_graph + self._receiver = None + self._should_stop = False + + def task_sent(self, event): + pass + + def task_received(self, event): + pass + + def task_started(self, event): + self._handle(tasks_api.TASK_STARTED, event, send_event=True) + + def task_succeeded(self, event): + self._handle(tasks_api.TASK_SUCCEEDED, event, send_event=True) + + def task_failed(self, event): + if event.get('exception', '').startswith(OperationRetry.__name__): + state = tasks_api.TASK_RESCHEDULED + else: + state = tasks_api.TASK_FAILED + self._handle(state, event, send_event=False) + + def task_revoked(self, event): + pass + + def task_retried(self, event): + pass + + def _handle(self, state, event, send_event): + task_id = event['uuid'] + task = self.tasks_graph.get_task(task_id) + if task is not None: + if send_event: + send_task_event(state, task, send_task_event_func_remote, + event) + task.set_state(state) + + def capture(self): + # Only called when running within an agent, so import here + from cloudify_agent.app import app + with app.connection() as connection: + self._receiver = app.events.Receiver(connection, handlers={ + 'task-sent': self.task_sent, + 'task-received': self.task_received, + 'task-started': self.task_started, + 'task-succeeded': self.task_succeeded, + 'task-failed': self.task_failed, + 'task-revoked': self.task_revoked, + 'task-retried': self.task_retried + }) + for _ in self._receiver.itercapture(limit=None, + timeout=None, + wakeup=True): + if self._should_stop: + return + + def stop(self): + self._should_stop = True + self._receiver.should_stop = True + + +def send_task_event_func_remote(task, event_type, message, + additional_context=None): + _send_task_event_func(task, event_type, message, + out_func=logs.amqp_event_out, + additional_context=additional_context) + + +def send_task_event_func_local(task, event_type, message, + additional_context=None): + _send_task_event_func(task, event_type, message, + out_func=logs.stdout_event_out, + additional_context=additional_context) + + +def _send_task_event_func(task, event_type, message, out_func, + additional_context): + if task.cloudify_context is None: + logs.send_workflow_event(ctx=task.workflow_context, + event_type=event_type, + message=message, + out_func=out_func, + additional_context=additional_context) + else: + logs.send_task_event(cloudify_context=task.cloudify_context, + event_type=event_type, + message=message, + out_func=out_func, + additional_context=additional_context) + + +def _filter_task(task, state): + return state != tasks_api.TASK_FAILED and not task.send_task_events + + +def send_task_event(state, task, send_event_func, event): + """ + Send a task event delegating to 'send_event_func' + which will send events to RabbitMQ or use the workflow context logger + in local context + + :param state: the task state (valid: ['sending', 'started', 'rescheduled', + 'succeeded', 'failed']) + :param task: a WorkflowTask instance to send the event for + :param send_event_func: function for actually sending the event somewhere + :param event: a dict with either a result field or an exception fields + follows celery event structure but used by local tasks as + well + """ + if _filter_task(task, state): + return + + if state in (tasks_api.TASK_FAILED, tasks_api.TASK_RESCHEDULED, + tasks_api.TASK_SUCCEEDED) and event is None: + raise RuntimeError('Event for task {0} is None'.format(task.name)) + + if event and event.get('exception'): + exception_str = str(event.get('exception')) + else: + exception_str = None + + if state == tasks_api.TASK_SENDING: + message = "Sending task '{0}'".format(task.name) + event_type = 'sending_task' + elif state == tasks_api.TASK_STARTED: + message = "Task started '{0}'".format(task.name) + event_type = 'task_started' + elif state == tasks_api.TASK_SUCCEEDED: + result = str(event.get('result')) + suffix = ' ({0})'.format(result) if result not in ("'None'", + 'None') else '' + message = "Task succeeded '{0}{1}'".format(task.name, suffix) + event_type = 'task_succeeded' + elif state == tasks_api.TASK_RESCHEDULED: + message = "Task rescheduled '{0}'".format(task.name) + if exception_str: + message = '{0} -> {1}'.format(message, exception_str) + event_type = 'task_rescheduled' + task.error = exception_str + elif state == tasks_api.TASK_FAILED: + message = "Task failed '{0}'".format(task.name) + if exception_str: + message = "{0} -> {1}".format(message, exception_str) + event_type = 'task_failed' + task.error = exception_str + else: + raise RuntimeError('unhandled event type: {0}'.format(state)) + + if task.current_retries > 0: + retry = ' [retry {0}{1}]'.format( + task.current_retries, + '/{0}'.format(task.total_retries) + if task.total_retries >= 0 else '') + message = '{0}{1}'.format(message, retry) + + additional_context = { + 'task_current_retries': task.current_retries, + 'task_total_retries': task.total_retries + } + + if state in (tasks_api.TASK_FAILED, tasks_api.TASK_RESCHEDULED): + additional_context['task_error_causes'] = event.get('causes') + + send_event_func(task=task, + event_type=event_type, + message=message, + additional_context=additional_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/from_cloudify/workflows/local.py ---------------------------------------------------------------------- diff --git a/aria/from_cloudify/workflows/local.py b/aria/from_cloudify/workflows/local.py new file mode 100644 index 0000000..6293edc --- /dev/null +++ b/aria/from_cloudify/workflows/local.py @@ -0,0 +1,598 @@ +######## +# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. + +import os +import tempfile +import copy +import importlib +import shutil +import uuid +import json +import threading +from StringIO import StringIO +from contextlib import contextmanager + +from cloudify_rest_client.nodes import Node +from cloudify_rest_client.node_instances import NodeInstance + +from cloudify import dispatch +from cloudify.workflows.workflow_context import ( + DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE) + +try: + from dsl_parser.constants import HOST_TYPE + from dsl_parser import parser as dsl_parser, tasks as dsl_tasks + from dsl_parser import functions as dsl_functions + _import_error = None +except ImportError as e: + _import_error = str(e) + dsl_parser = None + dsl_tasks = None + dsl_functions = None + HOST_TYPE = None + + +class _Environment(object): + + def __init__(self, + storage, + blueprint_path=None, + name='local', + inputs=None, + load_existing=False, + ignored_modules=None, + provider_context=None, + resolver=None, + validate_version=True): + self.storage = storage + self.storage.env = self + + if load_existing: + self.storage.load(name) + else: + plan, nodes, node_instances = _parse_plan(blueprint_path, + inputs, + ignored_modules, + resolver, + validate_version) + storage.init( + name=name, + plan=plan, + nodes=nodes, + node_instances=node_instances, + blueprint_path=blueprint_path, + provider_context=provider_context) + + @property + def plan(self): + return self.storage.plan + + @property + def name(self): + return self.storage.name + + def outputs(self): + return dsl_functions.evaluate_outputs( + outputs_def=self.plan['outputs'], + get_node_instances_method=self.storage.get_node_instances, + get_node_instance_method=self.storage.get_node_instance, + get_node_method=self.storage.get_node) + + def evaluate_functions(self, payload, context): + return dsl_functions.evaluate_functions( + payload=payload, + context=context, + get_node_instances_method=self.storage.get_node_instances, + get_node_instance_method=self.storage.get_node_instance, + get_node_method=self.storage.get_node) + + def execute(self, + workflow, + parameters=None, + allow_custom_parameters=False, + task_retries=-1, + task_retry_interval=30, + subgraph_retries=0, + task_thread_pool_size=DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE): + workflows = self.plan['workflows'] + workflow_name = workflow + if workflow_name not in workflows: + raise ValueError("'{0}' workflow does not exist. " + "existing workflows are: {1}" + .format(workflow_name, + workflows.keys())) + + workflow = workflows[workflow_name] + execution_id = str(uuid.uuid4()) + ctx = { + 'type': 'workflow', + 'local': True, + 'deployment_id': self.name, + 'blueprint_id': self.name, + 'execution_id': execution_id, + 'workflow_id': workflow_name, + 'storage': self.storage, + 'task_retries': task_retries, + 'task_retry_interval': task_retry_interval, + 'subgraph_retries': subgraph_retries, + 'local_task_thread_pool_size': task_thread_pool_size, + 'task_name': workflow['operation'] + } + + merged_parameters = _merge_and_validate_execution_parameters( + workflow, workflow_name, parameters, allow_custom_parameters) + + return dispatch.dispatch(__cloudify_context=ctx, **merged_parameters) + + +def init_env(blueprint_path, + name='local', + inputs=None, + storage=None, + ignored_modules=None, + provider_context=None, + resolver=None, + validate_version=True): + if storage is None: + storage = InMemoryStorage() + return _Environment(storage=storage, + blueprint_path=blueprint_path, + name=name, + inputs=inputs, + load_existing=False, + ignored_modules=ignored_modules, + provider_context=provider_context, + resolver=resolver, + validate_version=validate_version) + + +def load_env(name, storage, resolver=None): + return _Environment(storage=storage, + name=name, + load_existing=True, + resolver=resolver) + + +def _parse_plan(blueprint_path, inputs, ignored_modules, resolver, + validate_version): + if dsl_parser is None: + raise ImportError('cloudify-dsl-parser must be installed to ' + 'execute local workflows. ' + '(e.g. "pip install cloudify-dsl-parser") [{0}]' + .format(_import_error)) + plan = dsl_tasks.prepare_deployment_plan( + dsl_parser.parse_from_path( + dsl_file_path=blueprint_path, + resolver=resolver, + validate_version=validate_version), + inputs=inputs) + nodes = [Node(node) for node in plan['nodes']] + node_instances = [NodeInstance(instance) + for instance in plan['node_instances']] + _prepare_nodes_and_instances(nodes, node_instances, ignored_modules) + return plan, nodes, node_instances + + +def _validate_node(node): + if HOST_TYPE in node['type_hierarchy']: + install_agent_prop = node.properties.get('install_agent') + if install_agent_prop: + raise ValueError("'install_agent': true is not supported " + "(it is True by default) " + "when executing local workflows. " + "The 'install_agent' property " + "must be set to false for each node of type {0}." + .format(HOST_TYPE)) + + +def _prepare_nodes_and_instances(nodes, node_instances, ignored_modules): + + def scan(parent, name, node): + for operation in parent.get(name, {}).values(): + if not operation['operation']: + continue + _get_module_method(operation['operation'], + tpe=name, + node_name=node.id, + ignored_modules=ignored_modules) + + for node in nodes: + scalable = node['capabilities']['scalable']['properties'] + node.update(dict( + number_of_instances=scalable['current_instances'], + deploy_number_of_instances=scalable['default_instances'], + min_number_of_instances=scalable['min_instances'], + max_number_of_instances=scalable['max_instances'], + )) + if 'relationships' not in node: + node['relationships'] = [] + scan(node, 'operations', node) + _validate_node(node) + for relationship in node['relationships']: + scan(relationship, 'source_operations', node) + scan(relationship, 'target_operations', node) + + for node_instance in node_instances: + node_instance['version'] = 0 + node_instance['runtime_properties'] = {} + node_instance['node_id'] = node_instance['name'] + if 'relationships' not in node_instance: + node_instance['relationships'] = [] + + +def _get_module_method(module_method_path, tpe, node_name, + ignored_modules=None): + ignored_modules = ignored_modules or [] + split = module_method_path.split('.') + module_name = '.'.join(split[:-1]) + if module_name in ignored_modules: + return None + method_name = split[-1] + try: + module = importlib.import_module(module_name) + except ImportError: + raise ImportError('mapping error: No module named {0} ' + '[node={1}, type={2}]' + .format(module_name, node_name, tpe)) + try: + return getattr(module, method_name) + except AttributeError: + raise AttributeError("mapping error: {0} has no attribute '{1}' " + "[node={2}, type={3}]" + .format(module.__name__, method_name, + node_name, tpe)) + + +def _try_convert_from_str(string, target_type): + if target_type == basestring: + return string + if target_type == bool: + if string.lower() == 'true': + return True + if string.lower() == 'false': + return False + return string + try: + return target_type(string) + except ValueError: + return string + + +def _merge_and_validate_execution_parameters( + workflow, workflow_name, execution_parameters=None, + allow_custom_parameters=False): + + merged_parameters = {} + workflow_parameters = workflow.get('parameters', {}) + execution_parameters = execution_parameters or {} + + missing_mandatory_parameters = set() + + allowed_types = { + 'integer': int, + 'float': float, + 'string': basestring, + 'boolean': bool + } + wrong_types = {} + + for name, param in workflow_parameters.iteritems(): + + if 'type' in param and name in execution_parameters: + + # check if need to convert from string + if (isinstance(execution_parameters[name], basestring) and + param['type'] in allowed_types): + execution_parameters[name] = \ + _try_convert_from_str( + execution_parameters[name], + allowed_types[param['type']]) + + # validate type + if not isinstance(execution_parameters[name], + allowed_types.get(param['type'], object)): + wrong_types[name] = param['type'] + + if 'default' not in param: + if name not in execution_parameters: + missing_mandatory_parameters.add(name) + continue + merged_parameters[name] = execution_parameters[name] + else: + merged_parameters[name] = execution_parameters[name] if \ + name in execution_parameters else param['default'] + + if missing_mandatory_parameters: + raise ValueError( + 'Workflow "{0}" must be provided with the following ' + 'parameters to execute: {1}' + .format(workflow_name, ','.join(missing_mandatory_parameters))) + + if wrong_types: + error_message = StringIO() + for param_name, param_type in wrong_types.iteritems(): + error_message.write('Parameter "{0}" must be of type {1}\n'. + format(param_name, param_type)) + raise ValueError(error_message.getvalue()) + + custom_parameters = dict( + (k, v) for (k, v) in execution_parameters.iteritems() + if k not in workflow_parameters) + + if not allow_custom_parameters and custom_parameters: + raise ValueError( + 'Workflow "{0}" does not have the following parameters ' + 'declared: {1}. Remove these parameters or use ' + 'the flag for allowing custom parameters' + .format(workflow_name, ','.join(custom_parameters.keys()))) + + merged_parameters.update(custom_parameters) + return merged_parameters + + +class _Storage(object): + + def __init__(self): + self.name = None + self.resources_root = None + self.plan = None + self._nodes = None + self._locks = None + self.env = None + self._provider_context = None + + def init(self, name, plan, nodes, node_instances, blueprint_path, + provider_context): + self.name = name + self.resources_root = os.path.dirname(os.path.abspath(blueprint_path)) + self.plan = plan + self._provider_context = provider_context or {} + self._init_locks_and_nodes(nodes) + + def _init_locks_and_nodes(self, nodes): + self._nodes = dict((node.id, node) for node in nodes) + self._locks = dict((instance_id, threading.RLock()) for instance_id + in self._instance_ids()) + + def load(self, name): + raise NotImplementedError() + + def get_resource(self, resource_path): + with open(os.path.join(self.resources_root, resource_path)) as f: + return f.read() + + def download_resource(self, resource_path, target_path=None): + if not target_path: + suffix = '-{0}'.format(os.path.basename(resource_path)) + target_path = tempfile.mktemp(suffix=suffix) + resource = self.get_resource(resource_path) + with open(target_path, 'wb') as f: + f.write(resource) + return target_path + + def update_node_instance(self, + node_instance_id, + version, + runtime_properties=None, + state=None): + with self._lock(node_instance_id): + instance = self._get_node_instance(node_instance_id) + if state is None and version != instance['version']: + raise StorageConflictError('version {0} does not match ' + 'current version of ' + 'node instance {1} which is {2}' + .format(version, + node_instance_id, + instance['version'])) + else: + instance['version'] += 1 + if runtime_properties is not None: + instance['runtime_properties'] = runtime_properties + if state is not None: + instance['state'] = state + self._store_instance(instance) + + def _get_node_instance(self, node_instance_id): + instance = self._load_instance(node_instance_id) + if instance is None: + raise RuntimeError('Instance {0} does not exist' + .format(node_instance_id)) + return instance + + def get_node(self, node_id): + node = self._nodes.get(node_id) + if node is None: + raise RuntimeError('Node {0} does not exist' + .format(node_id)) + return copy.deepcopy(node) + + def get_nodes(self): + return copy.deepcopy(self._nodes.values()) + + def get_node_instance(self, node_instance_id): + return copy.deepcopy(self._get_node_instance(node_instance_id)) + + def get_provider_context(self): + return copy.deepcopy(self._provider_context) + + def _load_instance(self, node_instance_id): + raise NotImplementedError() + + def _store_instance(self, node_instance): + raise NotImplementedError() + + def get_node_instances(self, node_id=None): + raise NotImplementedError() + + def _instance_ids(self): + raise NotImplementedError() + + def _lock(self, node_instance_id): + return self._locks[node_instance_id] + + def get_workdir(self): + raise NotImplementedError() + + +class InMemoryStorage(_Storage): + + def __init__(self): + super(InMemoryStorage, self).__init__() + self._node_instances = None + + def init(self, name, plan, nodes, node_instances, blueprint_path, + provider_context): + self.plan = plan + self._node_instances = dict((instance.id, instance) + for instance in node_instances) + super(InMemoryStorage, self).init(name, plan, nodes, node_instances, + blueprint_path, provider_context) + + def load(self, name): + raise NotImplementedError('load is not implemented by memory storage') + + def _load_instance(self, node_instance_id): + return self._node_instances.get(node_instance_id) + + def _store_instance(self, node_instance): + pass + + def get_node_instances(self, node_id=None): + instances = self._node_instances.values() + if node_id: + instances = [i for i in instances if i.node_id == node_id] + return copy.deepcopy(instances) + + def _instance_ids(self): + return self._node_instances.keys() + + def get_workdir(self): + raise NotImplementedError('get_workdir is not implemented by memory ' + 'storage') + + +class FileStorage(_Storage): + + def __init__(self, storage_dir='/tmp/cloudify-workflows'): + super(FileStorage, self).__init__() + self._root_storage_dir = os.path.join(storage_dir) + self._storage_dir = None + self._workdir = None + self._instances_dir = None + self._data_path = None + self._payload_path = None + self._blueprint_path = None + + def init(self, name, plan, nodes, node_instances, blueprint_path, + provider_context): + storage_dir = os.path.join(self._root_storage_dir, name) + workdir = os.path.join(storage_dir, 'work') + instances_dir = os.path.join(storage_dir, 'node-instances') + data_path = os.path.join(storage_dir, 'data') + payload_path = os.path.join(storage_dir, 'payload') + os.makedirs(storage_dir) + os.mkdir(instances_dir) + os.mkdir(workdir) + with open(payload_path, 'w') as f: + f.write(json.dumps({})) + + blueprint_filename = os.path.basename(os.path.abspath(blueprint_path)) + with open(data_path, 'w') as f: + f.write(json.dumps({ + 'plan': plan, + 'blueprint_filename': blueprint_filename, + 'nodes': nodes, + 'provider_context': provider_context or {} + })) + resources_root = os.path.dirname(os.path.abspath(blueprint_path)) + self.resources_root = os.path.join(storage_dir, 'resources') + + def ignore(src, names): + return names if os.path.abspath(self.resources_root) == src \ + else set() + shutil.copytree(resources_root, self.resources_root, ignore=ignore) + self._instances_dir = instances_dir + for instance in node_instances: + self._store_instance(instance, lock=False) + self.load(name) + + def load(self, name): + self.name = name + self._storage_dir = os.path.join(self._root_storage_dir, name) + self._workdir = os.path.join(self._storage_dir, 'workdir') + self._instances_dir = os.path.join(self._storage_dir, 'node-instances') + self._payload_path = os.path.join(self._storage_dir, 'payload') + self._data_path = os.path.join(self._storage_dir, 'data') + with open(self._data_path) as f: + data = json.loads(f.read()) + self.plan = data['plan'] + self.resources_root = os.path.join(self._storage_dir, 'resources') + self._blueprint_path = os.path.join(self.resources_root, + data['blueprint_filename']) + self._provider_context = data.get('provider_context', {}) + nodes = [Node(node) for node in data['nodes']] + self._init_locks_and_nodes(nodes) + + @contextmanager + def payload(self): + with open(self._payload_path, 'r') as f: + payload = json.load(f) + yield payload + with open(self._payload_path, 'w') as f: + json.dump(payload, f, indent=2) + f.write(os.linesep) + + def get_blueprint_path(self): + return self._blueprint_path + + def get_node_instance(self, node_instance_id): + return self._get_node_instance(node_instance_id) + + def _load_instance(self, node_instance_id): + with self._lock(node_instance_id): + with open(self._instance_path(node_instance_id)) as f: + return NodeInstance(json.loads(f.read())) + + def _store_instance(self, node_instance, lock=True): + instance_lock = None + if lock: + instance_lock = self._lock(node_instance.id) + instance_lock.acquire() + try: + with open(self._instance_path(node_instance.id), 'w') as f: + f.write(json.dumps(node_instance)) + finally: + if lock and instance_lock: + instance_lock.release() + + def _instance_path(self, node_instance_id): + return os.path.join(self._instances_dir, node_instance_id) + + def get_node_instances(self, node_id=None): + instances = [self._get_node_instance(instance_id) + for instance_id in self._instance_ids()] + if node_id: + instances = [i for i in instances if i.node_id == node_id] + return instances + + def _instance_ids(self): + return os.listdir(self._instances_dir) + + def get_workdir(self): + return self._workdir + + +class StorageConflictError(Exception): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/from_cloudify/workflows/tasks.py ---------------------------------------------------------------------- diff --git a/aria/from_cloudify/workflows/tasks.py b/aria/from_cloudify/workflows/tasks.py new file mode 100644 index 0000000..bf676f1 --- /dev/null +++ b/aria/from_cloudify/workflows/tasks.py @@ -0,0 +1,767 @@ +######## +# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. + + +import sys +import time +import uuid +import Queue + +from cloudify import utils +from cloudify import exceptions +from cloudify.workflows import api + +INFINITE_TOTAL_RETRIES = -1 +DEFAULT_TOTAL_RETRIES = INFINITE_TOTAL_RETRIES +DEFAULT_RETRY_INTERVAL = 30 +DEFAULT_SUBGRAPH_TOTAL_RETRIES = 0 + +DEFAULT_SEND_TASK_EVENTS = True + +TASK_PENDING = 'pending' +TASK_SENDING = 'sending' +TASK_SENT = 'sent' +TASK_STARTED = 'started' +TASK_RESCHEDULED = 'rescheduled' +TASK_SUCCEEDED = 'succeeded' +TASK_FAILED = 'failed' + +TERMINATED_STATES = [TASK_RESCHEDULED, TASK_SUCCEEDED, TASK_FAILED] + +DISPATCH_TASK = 'cloudify.dispatch.dispatch' + +INSPECT_TIMEOUT = 30 + + +def retry_failure_handler(task): + """Basic on_success/on_failure handler that always returns retry""" + return HandlerResult.retry() + + +class WorkflowTask(object): + """A base class for workflow tasks""" + + def __init__(self, + workflow_context, + task_id=None, + info=None, + on_success=None, + on_failure=None, + total_retries=DEFAULT_TOTAL_RETRIES, + retry_interval=DEFAULT_RETRY_INTERVAL, + send_task_events=DEFAULT_SEND_TASK_EVENTS): + """ + :param task_id: The id of this task (generated if none is provided) + :param info: A short description of this task (for logging) + :param on_success: A handler called when the task's execution + terminates successfully. + Expected to return one of + [HandlerResult.retry(), HandlerResult.cont()] + to indicate whether this task should be re-executed. + :param on_failure: A handler called when the task's execution + fails. + Expected to return one of + [HandlerResult.retry(), HandlerResult.ignore(), + HandlerResult.fail()] + to indicate whether this task should be re-executed, + cause the engine to terminate workflow execution + immediately or simply ignore this task failure and + move on. + :param total_retries: Maximum retry attempt for this task, in case + the handlers return a retry attempt. + :param retry_interval: Number of seconds to wait between retries + :param workflow_context: the CloudifyWorkflowContext instance + """ + self.id = task_id or str(uuid.uuid4()) + self._state = TASK_PENDING + self.async_result = None + self.on_success = on_success + self.on_failure = on_failure + self.info = info + self.error = None + self.total_retries = total_retries + self.retry_interval = retry_interval + self.terminated = Queue.Queue(maxsize=1) + self.is_terminated = False + self.workflow_context = workflow_context + self.send_task_events = send_task_events + self.containing_subgraph = None + + self.current_retries = 0 + # timestamp for which the task should not be executed + # by the task graph before reached, overridden by the task + # graph during retries + self.execute_after = time.time() + + def dump(self): + return { + 'id': self.id, + 'state': self.get_state(), + 'info': self.info, + 'error': self.error, + 'current_retries': self.current_retries, + 'cloudify_context': self.cloudify_context + } + + def is_remote(self): + """ + :return: Is this a remote task + """ + return not self.is_local() + + def is_local(self): + """ + :return: Is this a local task + """ + raise NotImplementedError('Implemented by subclasses') + + def is_nop(self): + """ + :return: Is this a NOP task + """ + return False + + def get_state(self): + """ + Get the task state + + :return: The task state [pending, sending, sent, started, + rescheduled, succeeded, failed] + """ + return self._state + + def set_state(self, state): + """ + Set the task state + + :param state: The state to set [pending, sending, sent, started, + rescheduled, succeeded, failed] + """ + if state not in [TASK_PENDING, TASK_SENDING, TASK_SENT, TASK_STARTED, + TASK_RESCHEDULED, TASK_SUCCEEDED, TASK_FAILED]: + raise RuntimeError('Illegal state set on task: {0} ' + '[task={1}]'.format(state, str(self))) + self._state = state + if state in TERMINATED_STATES: + self.is_terminated = True + self.terminated.put_nowait(True) + + def wait_for_terminated(self, timeout=None): + if self.is_terminated: + return + self.terminated.get(timeout=timeout) + + def handle_task_terminated(self): + if self.get_state() in (TASK_FAILED, TASK_RESCHEDULED): + handler_result = self._handle_task_not_succeeded() + else: + handler_result = self._handle_task_succeeded() + + if handler_result.action == HandlerResult.HANDLER_RETRY: + if any([self.total_retries == INFINITE_TOTAL_RETRIES, + self.current_retries < self.total_retries, + handler_result.ignore_total_retries]): + if handler_result.retry_after is None: + handler_result.retry_after = self.retry_interval + if handler_result.retried_task is None: + new_task = self.duplicate_for_retry( + time.time() + handler_result.retry_after) + handler_result.retried_task = new_task + else: + handler_result.action = HandlerResult.HANDLER_FAIL + + if self.containing_subgraph: + subgraph = self.containing_subgraph + retried_task = None + if handler_result.action == HandlerResult.HANDLER_FAIL: + handler_result.action = HandlerResult.HANDLER_IGNORE + # It is possible that two concurrent tasks failed. + # we will only consider the first one handled + if not subgraph.failed_task: + subgraph.failed_task = self + subgraph.set_state(TASK_FAILED) + elif handler_result.action == HandlerResult.HANDLER_RETRY: + retried_task = handler_result.retried_task + subgraph.task_terminated(task=self, new_task=retried_task) + + return handler_result + + def _handle_task_succeeded(self): + """Call handler for task success""" + if self.on_success: + return self.on_success(self) + else: + return HandlerResult.cont() + + def _handle_task_not_succeeded(self): + + """ + Call handler for task which hasn't ended in 'succeeded' state + (i.e. has either failed or been rescheduled) + """ + + try: + exception = self.async_result.result + except Exception as e: + exception = exceptions.NonRecoverableError( + 'Could not de-serialize ' + 'exception of task {0} --> {1}: {2}' + .format(self.name, + type(e).__name__, + str(e))) + + if isinstance(exception, exceptions.OperationRetry): + # operation explicitly requested a retry, so we ignore + # the handler set on the task. + handler_result = HandlerResult.retry() + elif self.on_failure: + handler_result = self.on_failure(self) + else: + handler_result = HandlerResult.retry() + + if handler_result.action == HandlerResult.HANDLER_RETRY: + if isinstance(exception, exceptions.NonRecoverableError): + handler_result = HandlerResult.fail() + elif isinstance(exception, exceptions.RecoverableError): + handler_result.retry_after = exception.retry_after + + if not self.is_subgraph: + causes = [] + if isinstance(exception, (exceptions.RecoverableError, + exceptions.NonRecoverableError)): + causes = exception.causes or [] + if isinstance(self, LocalWorkflowTask): + tb = self.async_result._holder.error[1] + causes.append(utils.exception_to_error_cause(exception, tb)) + self.workflow_context.internal.send_task_event( + state=self.get_state(), + task=self, + event={'exception': exception, 'causes': causes}) + + return handler_result + + def __str__(self): + suffix = self.info if self.info is not None else '' + return '{0}({1})'.format(self.name, suffix) + + def duplicate_for_retry(self, execute_after): + """ + :return: A new instance of this task with a new task id + """ + dup = self._duplicate() + dup.execute_after = execute_after + dup.current_retries = self.current_retries + 1 + if dup.cloudify_context and 'operation' in dup.cloudify_context: + op_ctx = dup.cloudify_context['operation'] + op_ctx['retry_number'] = dup.current_retries + return dup + + def _duplicate(self): + raise NotImplementedError('Implemented by subclasses') + + @property + def cloudify_context(self): + raise NotImplementedError('Implemented by subclasses') + + @property + def name(self): + """ + :return: The task name + """ + + raise NotImplementedError('Implemented by subclasses') + + @property + def is_subgraph(self): + return False + + +class RemoteWorkflowTask(WorkflowTask): + """A WorkflowTask wrapping a celery based task""" + + # cache for registered tasks queries to celery workers + cache = {} + + def __init__(self, + kwargs, + cloudify_context, + workflow_context, + task_queue=None, + task_target=None, + task_id=None, + info=None, + on_success=None, + on_failure=retry_failure_handler, + total_retries=DEFAULT_TOTAL_RETRIES, + retry_interval=DEFAULT_RETRY_INTERVAL, + send_task_events=DEFAULT_SEND_TASK_EVENTS): + """ + :param kwargs: The keyword argument this task will be invoked with + :param cloudify_context: the cloudify context dict + :param task_queue: the cloudify context dict + :param task_target: the cloudify context dict + :param task_id: The id of this task (generated if none is provided) + :param info: A short description of this task (for logging) + :param on_success: A handler called when the task's execution + terminates successfully. + Expected to return one of + [HandlerResult.retry(), HandlerResult.cont()] + to indicate whether this task should be re-executed. + :param on_failure: A handler called when the task's execution + fails. + Expected to return one of + [HandlerResult.retry(), HandlerResult.ignore(), + HandlerResult.fail()] + to indicate whether this task should be re-executed, + cause the engine to terminate workflow execution + immediately or simply ignore this task failure and + move on. + :param total_retries: Maximum retry attempt for this task, in case + the handlers return a retry attempt. + :param retry_interval: Number of seconds to wait between retries + :param workflow_context: the CloudifyWorkflowContext instance + """ + super(RemoteWorkflowTask, self).__init__( + workflow_context, + task_id, + info=info, + on_success=on_success, + on_failure=on_failure, + total_retries=total_retries, + retry_interval=retry_interval, + send_task_events=send_task_events) + self._task_target = task_target + self._task_queue = task_queue + self._kwargs = kwargs + self._cloudify_context = cloudify_context + + def apply_async(self): + """ + Call the underlying celery tasks apply_async. Verify the worker + is alive and send an event before doing so. + + :return: a RemoteWorkflowTaskResult instance wrapping the + celery async result + """ + try: + task, self._task_queue, self._task_target = \ + self.workflow_context.internal.handler.get_task( + self, queue=self._task_queue, target=self._task_target) + self._verify_worker_alive() + self.workflow_context.internal.send_task_event(TASK_SENDING, self) + self.set_state(TASK_SENT) + async_result = task.apply_async(task_id=self.id) + self.async_result = RemoteWorkflowTaskResult(self, async_result) + except (exceptions.NonRecoverableError, + exceptions.RecoverableError) as e: + self.set_state(TASK_FAILED) + self.async_result = RemoteWorkflowErrorTaskResult(self, e) + return self.async_result + + def is_local(self): + return False + + def _duplicate(self): + dup = RemoteWorkflowTask(kwargs=self._kwargs, + task_queue=self.queue, + task_target=self.target, + cloudify_context=self.cloudify_context, + workflow_context=self.workflow_context, + task_id=None, # we want a new task id + info=self.info, + on_success=self.on_success, + on_failure=self.on_failure, + total_retries=self.total_retries, + retry_interval=self.retry_interval, + send_task_events=self.send_task_events) + dup.cloudify_context['task_id'] = dup.id + return dup + + @property + def name(self): + """The task name""" + return self.cloudify_context['task_name'] + + @property + def cloudify_context(self): + return self._cloudify_context + + @property + def target(self): + """The task target (worker name)""" + return self._task_target + + @property + def queue(self): + """The task queue""" + return self._task_queue + + @property + def kwargs(self): + """kwargs to pass when invoking the task""" + return self._kwargs + + def _verify_worker_alive(self): + verify_worker_alive(self.name, + self.target, + self._get_registered) + + def _get_registered(self): + # import here because this only applies in remote execution + # environments + from cloudify_agent.app import app + + worker_name = 'celery@{0}'.format(self.target) + inspect = app.control.inspect(destination=[worker_name], + timeout=INSPECT_TIMEOUT) + registered = inspect.registered() + if registered is None or worker_name not in registered: + return None + return set(registered[worker_name]) + + +class LocalWorkflowTask(WorkflowTask): + """A WorkflowTask wrapping a local callable""" + + def __init__(self, + local_task, + workflow_context, + node=None, + info=None, + on_success=None, + on_failure=retry_failure_handler, + total_retries=DEFAULT_TOTAL_RETRIES, + retry_interval=DEFAULT_RETRY_INTERVAL, + send_task_events=DEFAULT_SEND_TASK_EVENTS, + kwargs=None, + task_id=None, + name=None): + """ + :param local_task: A callable + :param workflow_context: the CloudifyWorkflowContext instance + :param node: The CloudifyWorkflowNode instance (if in node context) + :param info: A short description of this task (for logging) + :param on_success: A handler called when the task's execution + terminates successfully. + Expected to return one of + [HandlerResult.retry(), HandlerResult.cont()] + to indicate whether this task should be re-executed. + :param on_failure: A handler called when the task's execution + fails. + Expected to return one of + [HandlerResult.retry(), HandlerResult.ignore(), + HandlerResult.fail()] + to indicate whether this task should be re-executed, + cause the engine to terminate workflow execution + immediately or simply ignore this task failure and + move on. + :param total_retries: Maximum retry attempt for this task, in case + the handlers return a retry attempt. + :param retry_interval: Number of seconds to wait between retries + :param kwargs: Local task keyword arguments + :param name: optional parameter (default: local_task.__name__) + """ + super(LocalWorkflowTask, self).__init__( + info=info, + on_success=on_success, + on_failure=on_failure, + total_retries=total_retries, + retry_interval=retry_interval, + task_id=task_id, + workflow_context=workflow_context, + send_task_events=send_task_events) + self.local_task = local_task + self.node = node + self.kwargs = kwargs or {} + self._name = name or local_task.__name__ + + def dump(self): + super_dump = super(LocalWorkflowTask, self).dump() + super_dump.update({ + 'name': self._name + }) + return super_dump + + def apply_async(self): + """ + Execute the task in the local task thread pool + :return: A wrapper for the task result + """ + + def local_task_wrapper(): + try: + self.workflow_context.internal.send_task_event(TASK_STARTED, + self) + result = self.local_task(**self.kwargs) + self.workflow_context.internal.send_task_event( + TASK_SUCCEEDED, self, event={'result': str(result)}) + self.async_result._holder.result = result + self.set_state(TASK_SUCCEEDED) + except BaseException as e: + new_task_state = TASK_RESCHEDULED if isinstance( + e, exceptions.OperationRetry) else TASK_FAILED + exc_type, exception, tb = sys.exc_info() + self.async_result._holder.error = (exception, tb) + self.set_state(new_task_state) + + self.async_result = LocalWorkflowTaskResult(self) + + self.workflow_context.internal.send_task_event(TASK_SENDING, self) + self.set_state(TASK_SENT) + self.workflow_context.internal.add_local_task(local_task_wrapper) + + return self.async_result + + def is_local(self): + return True + + def _duplicate(self): + dup = LocalWorkflowTask(local_task=self.local_task, + workflow_context=self.workflow_context, + node=self.node, + info=self.info, + on_success=self.on_success, + on_failure=self.on_failure, + total_retries=self.total_retries, + retry_interval=self.retry_interval, + send_task_events=self.send_task_events, + kwargs=self.kwargs, + name=self.name) + return dup + + @property + def name(self): + """The task name""" + return self._name + + @property + def cloudify_context(self): + return self.kwargs.get('__cloudify_context') + + +# NOP tasks class +class NOPLocalWorkflowTask(LocalWorkflowTask): + + def __init__(self, workflow_context): + super(NOPLocalWorkflowTask, self).__init__(lambda: None, + workflow_context) + + @property + def name(self): + """The task name""" + return 'NOP' + + def apply_async(self): + self.set_state(TASK_SUCCEEDED) + return LocalWorkflowTaskResult(self) + + def is_nop(self): + return True + + +class WorkflowTaskResult(object): + """A base wrapper for workflow task results""" + + def __init__(self, task): + self.task = task + + def _process(self, retry_on_failure): + if self.task.workflow_context.internal.graph_mode: + return self._get() + task_graph = self.task.workflow_context.internal.task_graph + while True: + self._wait_for_task_terminated() + handler_result = self.task.handle_task_terminated() + task_graph.remove_task(self.task) + try: + result = self._get() + if handler_result.action != HandlerResult.HANDLER_RETRY: + return result + except: + if (not retry_on_failure or + handler_result.action == HandlerResult.HANDLER_FAIL): + raise + self._sleep(handler_result.retry_after) + self.task = handler_result.retried_task + task_graph.add_task(self.task) + self._check_execution_cancelled() + self.task.apply_async() + self._refresh_state() + + @staticmethod + def _check_execution_cancelled(): + if api.has_cancel_request(): + raise api.ExecutionCancelled() + + def _wait_for_task_terminated(self): + while True: + self._check_execution_cancelled() + try: + self.task.wait_for_terminated(timeout=1) + break + except Queue.Empty: + continue + + def _sleep(self, seconds): + while seconds > 0: + self._check_execution_cancelled() + sleep_time = 1 if seconds > 1 else seconds + time.sleep(sleep_time) + seconds -= sleep_time + + def get(self, retry_on_failure=True): + """ + Get the task result. + Will block until the task execution ends. + + :return: The task result + """ + return self._process(retry_on_failure) + + def _get(self): + raise NotImplementedError('Implemented by subclasses') + + def _refresh_state(self): + raise NotImplementedError('Implemented by subclasses') + + +class RemoteWorkflowErrorTaskResult(WorkflowTaskResult): + + def __init__(self, task, exception): + super(RemoteWorkflowErrorTaskResult, self).__init__(task) + self.exception = exception + + def _get(self): + raise self.exception + + @property + def result(self): + return self.exception + + +class RemoteWorkflowTaskResult(WorkflowTaskResult): + """A wrapper for celery's AsyncResult""" + + def __init__(self, task, async_result): + super(RemoteWorkflowTaskResult, self).__init__(task) + self.async_result = async_result + + def _get(self): + return self.async_result.get() + + def _refresh_state(self): + self.async_result = self.task.async_result.async_result + + @property + def result(self): + return self.async_result.result + + +class LocalWorkflowTaskResult(WorkflowTaskResult): + """A wrapper for local workflow task results""" + + class ResultHolder(object): + + def __init__(self, result=None, error=None): + self.result = result + self.error = error + + def __init__(self, task): + """ + :param task: The LocalWorkflowTask instance + """ + super(LocalWorkflowTaskResult, self).__init__(task) + self._holder = self.ResultHolder() + + def _get(self): + if self._holder.error is not None: + exception, traceback = self._holder.error + raise exception, None, traceback + return self._holder.result + + def _refresh_state(self): + self._holder = self.task.async_result._holder + + @property + def result(self): + if self._holder.error: + return self._holder.error[0] + else: + return self._holder.result + + +class StubAsyncResult(object): + """Stub async result that always returns None""" + result = None + + +class HandlerResult(object): + + HANDLER_RETRY = 'handler_retry' + HANDLER_FAIL = 'handler_fail' + HANDLER_IGNORE = 'handler_ignore' + HANDLER_CONTINUE = 'handler_continue' + + def __init__(self, + action, + ignore_total_retries=False, + retry_after=None): + self.action = action + self.ignore_total_retries = ignore_total_retries + self.retry_after = retry_after + + # this field is filled by handle_terminated_task() below after + # duplicating the task and updating the relevant task fields + # or by a subgraph on_XXX handler + self.retried_task = None + + @classmethod + def retry(cls, ignore_total_retries=False, retry_after=None): + return HandlerResult(cls.HANDLER_RETRY, + ignore_total_retries=ignore_total_retries, + retry_after=retry_after) + + @classmethod + def fail(cls): + return HandlerResult(cls.HANDLER_FAIL) + + @classmethod + def cont(cls): + return HandlerResult(cls.HANDLER_CONTINUE) + + @classmethod + def ignore(cls): + return HandlerResult(cls.HANDLER_IGNORE) + + +def verify_worker_alive(name, target, get_registered): + + cache = RemoteWorkflowTask.cache + registered = cache.get(target) + if not registered: + registered = get_registered() + cache[target] = registered + + if registered is None: + raise exceptions.RecoverableError( + 'Timed out querying worker celery@{0} for its registered ' + 'tasks. [timeout={1} seconds]'.format(target, INSPECT_TIMEOUT)) + + if DISPATCH_TASK not in registered: + raise exceptions.NonRecoverableError( + 'Missing {0} task in worker {1} \n' + 'Registered tasks are: {2}. (This probably means the agent ' + 'configuration is invalid) [{3}]'.format( + DISPATCH_TASK, target, registered, name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/from_cloudify/workflows/tasks_graph.py ---------------------------------------------------------------------- diff --git a/aria/from_cloudify/workflows/tasks_graph.py b/aria/from_cloudify/workflows/tasks_graph.py new file mode 100644 index 0000000..31e5635 --- /dev/null +++ b/aria/from_cloudify/workflows/tasks_graph.py @@ -0,0 +1,372 @@ +######## +# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. + + +import os +import json +import time + +import networkx as nx + +from cloudify.workflows import api +from cloudify.workflows import tasks + + +class TaskDependencyGraph(object): + """ + A task graph builder + + :param workflow_context: A WorkflowContext instance (used for logging) + """ + + def __init__(self, workflow_context, + default_subgraph_task_config=None): + self.ctx = workflow_context + self.graph = nx.DiGraph() + default_subgraph_task_config = default_subgraph_task_config or {} + self._default_subgraph_task_config = default_subgraph_task_config + + def add_task(self, task): + """Add a WorkflowTask to this graph + + :param task: The task + """ + self.graph.add_node(task.id, task=task) + + def get_task(self, task_id): + """Get a task instance that was inserted to this graph by its id + + :param task_id: the task id + :return: a WorkflowTask instance for the requested task if found. + None, otherwise. + """ + data = self.graph.node.get(task_id) + return data['task'] if data is not None else None + + def remove_task(self, task): + """Remove the provided task from the graph + + :param task: The task + """ + if task.is_subgraph: + for subgraph_task in task.tasks.values(): + self.remove_task(subgraph_task) + if task.id in self.graph: + self.graph.remove_node(task.id) + + # src depends on dst + def add_dependency(self, src_task, dst_task): + """ + Add a dependency between tasks. + The source task will only be executed after the target task terminates. + A task may depend on several tasks, in which case it will only be + executed after all its 'destination' tasks terminate + + :param src_task: The source task + :param dst_task: The target task + """ + if not self.graph.has_node(src_task.id): + raise RuntimeError('source task {0} is not in graph (task id: ' + '{1})'.format(src_task, src_task.id)) + if not self.graph.has_node(dst_task.id): + raise RuntimeError('destination task {0} is not in graph (task ' + 'id: {1})'.format(dst_task, dst_task.id)) + self.graph.add_edge(src_task.id, dst_task.id) + + def sequence(self): + """ + :return: a new TaskSequence for this graph + """ + return TaskSequence(self) + + def subgraph(self, name): + task = SubgraphTask(name, self, **self._default_subgraph_task_config) + self.add_task(task) + return task + + def execute(self): + """ + Start executing the graph based on tasks and dependencies between + them. + Calling this method will block until one of the following occurs: + 1. all tasks terminated + 2. a task failed + 3. an unhandled exception is raised + 4. the execution is cancelled + + Note: This method will raise an api.ExecutionCancelled error if the + execution has been cancelled. When catching errors raised from this + method, make sure to re-raise the error if it's + api.ExecutionsCancelled in order to allow the execution to be set in + cancelled mode properly. + + Also note that for the time being, if such a cancelling event + occurs, the method might return even while there's some operations + still being executed. + """ + + while True: + + if self._is_execution_cancelled(): + raise api.ExecutionCancelled() + + self._check_dump_request() + + # handle all terminated tasks + # it is important this happens before handling + # executable tasks so we get to make tasks executable + # and then execute them in this iteration (otherwise, it would + # be the next one) + for task in self._terminated_tasks(): + self._handle_terminated_task(task) + + # handle all executable tasks + for task in self._executable_tasks(): + self._handle_executable_task(task) + + # no more tasks to process, time to move on + if len(self.graph.node) == 0: + return + # sleep some and do it all over again + else: + time.sleep(0.1) + + @staticmethod + def _is_execution_cancelled(): + return api.has_cancel_request() + + def _executable_tasks(self): + """ + A task is executable if it is in pending state + , it has no dependencies at the moment (i.e. all of its dependencies + already terminated) and its execution timestamp is smaller then the + current timestamp + + :return: An iterator for executable tasks + """ + now = time.time() + return (task for task in self.tasks_iter() + if task.get_state() == tasks.TASK_PENDING and + task.execute_after <= now and + not (task.containing_subgraph and + task.containing_subgraph.get_state() == + tasks.TASK_FAILED) and + not self._task_has_dependencies(task)) + + def _terminated_tasks(self): + """ + A task is terminated if it is in 'succeeded' or 'failed' state + + :return: An iterator for terminated tasks + """ + return (task for task in self.tasks_iter() + if task.get_state() in tasks.TERMINATED_STATES) + + def _task_has_dependencies(self, task): + """ + :param task: The task + :return: Does this task have any dependencies + """ + return (len(self.graph.succ.get(task.id, {})) > 0 or + (task.containing_subgraph and self._task_has_dependencies( + task.containing_subgraph))) + + def tasks_iter(self): + """ + An iterator on tasks added to the graph + """ + return (data['task'] for _, data in self.graph.nodes_iter(data=True)) + + def _handle_executable_task(self, task): + """Handle executable task""" + task.set_state(tasks.TASK_SENDING) + task.apply_async() + + def _handle_terminated_task(self, task): + """Handle terminated task""" + + handler_result = task.handle_task_terminated() + if handler_result.action == tasks.HandlerResult.HANDLER_FAIL: + if isinstance(task, SubgraphTask) and task.failed_task: + task = task.failed_task + message = "Workflow failed: Task failed '{0}'".format(task.name) + if task.error: + message = '{0} -> {1}'.format(message, task.error) + raise RuntimeError(message) + + dependents = self.graph.predecessors(task.id) + removed_edges = [(dependent, task.id) + for dependent in dependents] + self.graph.remove_edges_from(removed_edges) + self.graph.remove_node(task.id) + if handler_result.action == tasks.HandlerResult.HANDLER_RETRY: + new_task = handler_result.retried_task + self.add_task(new_task) + added_edges = [(dependent, new_task.id) + for dependent in dependents] + self.graph.add_edges_from(added_edges) + + def _check_dump_request(self): + task_dump = os.environ.get('WORKFLOW_TASK_DUMP') + if not (task_dump and os.path.exists(task_dump)): + return + os.remove(task_dump) + task_dump_path = '{0}.{1}'.format(task_dump, time.time()) + with open(task_dump_path, 'w') as f: + f.write(json.dumps({ + 'tasks': [task.dump() for task in self.tasks_iter()], + 'edges': [[s, t] for s, t in self.graph.edges_iter()]})) + + +class forkjoin(object): + """ + A simple wrapper for tasks. Used in conjunction with TaskSequence. + Defined to make the code easier to read (instead of passing a list) + see ``TaskSequence.add`` for more details + """ + + def __init__(self, *tasks): + self.tasks = tasks + + +class TaskSequence(object): + """ + Helper class to add tasks in a sequential manner to a task dependency + graph + + :param graph: The TaskDependencyGraph instance + """ + + def __init__(self, graph): + self.graph = graph + self.last_fork_join_tasks = None + + def add(self, *tasks): + """ + Add tasks to the sequence. + + :param tasks: Each task might be: + + * A WorkflowTask instance, in which case, it will be + added to the graph with a dependency between it and + the task previously inserted into the sequence + * A forkjoin of tasks, in which case it will be treated + as a "fork-join" task in the sequence, i.e. all the + fork-join tasks will depend on the last task in the + sequence (could be fork join) and the next added task + will depend on all tasks in this fork-join task + """ + for fork_join_tasks in tasks: + if isinstance(fork_join_tasks, forkjoin): + fork_join_tasks = fork_join_tasks.tasks + else: + fork_join_tasks = [fork_join_tasks] + for task in fork_join_tasks: + self.graph.add_task(task) + if self.last_fork_join_tasks is not None: + for last_fork_join_task in self.last_fork_join_tasks: + self.graph.add_dependency(task, last_fork_join_task) + if fork_join_tasks: + self.last_fork_join_tasks = fork_join_tasks + + +class SubgraphTask(tasks.WorkflowTask): + + def __init__(self, + name, + graph, + task_id=None, + on_success=None, + on_failure=None, + total_retries=tasks.DEFAULT_SUBGRAPH_TOTAL_RETRIES, + retry_interval=tasks.DEFAULT_RETRY_INTERVAL, + send_task_events=tasks.DEFAULT_SEND_TASK_EVENTS): + super(SubgraphTask, self).__init__( + graph.ctx, + task_id, + info=name, + on_success=on_success, + on_failure=on_failure, + total_retries=total_retries, + retry_interval=retry_interval, + send_task_events=send_task_events) + self.graph = graph + self._name = name + self.tasks = {} + self.failed_task = None + if not self.on_failure: + self.on_failure = lambda tsk: tasks.HandlerResult.fail() + self.async_result = tasks.StubAsyncResult() + + def _duplicate(self): + raise NotImplementedError('self.retried_task should be set explicitly' + ' in self.on_failure handler') + + @property + def cloudify_context(self): + return {} + + def is_local(self): + return True + + @property + def name(self): + return self._name + + @property + def is_subgraph(self): + return True + + def sequence(self): + return TaskSequence(self) + + def subgraph(self, name): + task = SubgraphTask(name, self.graph, + **self.graph._default_subgraph_task_config) + self.add_task(task) + return task + + def add_task(self, task): + self.graph.add_task(task) + self.tasks[task.id] = task + if task.containing_subgraph and task.containing_subgraph is not self: + raise RuntimeError('task {0}[{1}] cannot be contained in more ' + 'than one subgraph. It is currently contained ' + 'in {2} and it is now being added to {3}' + .format(task, + task.id, + task.containing_subgraph.name, + self.name)) + task.containing_subgraph = self + + def remove_task(self, task): + self.graph.remove_task(task) + + def add_dependency(self, src_task, dst_task): + self.graph.add_dependency(src_task, dst_task) + + def apply_async(self): + if not self.tasks: + self.set_state(tasks.TASK_SUCCEEDED) + else: + self.set_state(tasks.TASK_STARTED) + + def task_terminated(self, task, new_task=None): + del self.tasks[task.id] + if new_task: + self.tasks[new_task.id] = new_task + new_task.containing_subgraph = self + if not self.tasks and self.get_state() not in tasks.TERMINATED_STATES: + self.set_state(tasks.TASK_SUCCEEDED) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/from_cloudify/workflows/workflow_api.py ---------------------------------------------------------------------- diff --git a/aria/from_cloudify/workflows/workflow_api.py b/aria/from_cloudify/workflows/workflow_api.py new file mode 100644 index 0000000..dc60f18 --- /dev/null +++ b/aria/from_cloudify/workflows/workflow_api.py @@ -0,0 +1,47 @@ +######## +# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. + + +EXECUTION_CANCELLED_RESULT = 'execution_cancelled' + +cancel_request = False + + +def has_cancel_request(): + """ + Checks for requests to cancel the workflow execution. + This should be used to allow graceful termination of workflow executions. + + If this method is not used and acted upon, a simple 'cancel' + request for the execution will have no effect - 'force-cancel' will have + to be used to abruptly terminate the execution instead. + + Note: When this method returns True, the workflow should make the + appropriate cleanups and then it must raise an ExecutionCancelled error + if the execution indeed gets cancelled (i.e. if it's too late to cancel + there is no need to raise this exception and the workflow should end + normally). + + :return: whether there was a request to cancel the workflow execution + """ + return cancel_request + + +class ExecutionCancelled(Exception): + """ + This exception should be raised when a workflow has been cancelled, + once appropriate cleanups have taken place. + """ + pass