http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/from_cloudify/workflows/workflow_context.py ---------------------------------------------------------------------- diff --git a/aria/from_cloudify/workflows/workflow_context.py b/aria/from_cloudify/workflows/workflow_context.py new file mode 100644 index 0000000..354ca08 --- /dev/null +++ b/aria/from_cloudify/workflows/workflow_context.py @@ -0,0 +1,1525 @@ +######## +# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. + + +import functools +import copy +import uuid +import threading +import Queue + +from proxy_tools import proxy + +from cloudify import context +from cloudify.manager import (get_node_instance, + update_node_instance, + update_execution_status, + get_bootstrap_context, + get_rest_client, + download_resource) +from cloudify.workflows.tasks import (RemoteWorkflowTask, + LocalWorkflowTask, + NOPLocalWorkflowTask, + DEFAULT_TOTAL_RETRIES, + DEFAULT_RETRY_INTERVAL, + DEFAULT_SEND_TASK_EVENTS, + DEFAULT_SUBGRAPH_TOTAL_RETRIES) +from cloudify import utils +from cloudify import exceptions +from cloudify.state import current_workflow_ctx +from cloudify.workflows import events +from cloudify.workflows.tasks_graph import TaskDependencyGraph +from cloudify.amqp_client_utils import AMQPWrappedThread +from cloudify import logs +from cloudify.logs import (CloudifyWorkflowLoggingHandler, + CloudifyWorkflowNodeLoggingHandler, + SystemWideWorkflowLoggingHandler, + init_cloudify_logger, + send_workflow_event, + send_sys_wide_wf_event, + send_workflow_node_event) + +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict + + +DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE = 1 + + +class CloudifyWorkflowRelationshipInstance(object): + """ + A node instance relationship instance + + :param ctx: a CloudifyWorkflowContext instance + :param node_instance: a CloudifyWorkflowNodeInstance instance + :param nodes_and_instances: a WorkflowNodesAndInstancesContainer instance + :param relationship_instance: A relationship dict from a NodeInstance + instance (of the rest client model) + """ + + def __init__(self, ctx, node_instance, nodes_and_instances, + relationship_instance): + self.ctx = ctx + self.node_instance = node_instance + self._nodes_and_instances = nodes_and_instances + self._relationship_instance = relationship_instance + self._relationship = node_instance.node.get_relationship( + relationship_instance['target_name']) + + @property + def target_id(self): + """The relationship target node id""" + return self._relationship_instance.get('target_id') + + @property + def target_node_instance(self): + """ + The relationship's target node CloudifyWorkflowNodeInstance instance + """ + return self._nodes_and_instances.get_node_instance(self.target_id) + + @property + def relationship(self): + """The relationship object for this relationship instance""" + return self._relationship + + def execute_source_operation(self, + operation, + kwargs=None, + allow_kwargs_override=False, + send_task_events=DEFAULT_SEND_TASK_EVENTS): + """ + Execute a node relationship source operation + + :param operation: The node relationship operation + :param kwargs: optional kwargs to be passed to the called operation + """ + return self.ctx._execute_operation( + operation, + node_instance=self.node_instance, + related_node_instance=self.target_node_instance, + operations=self.relationship.source_operations, + kwargs=kwargs, + allow_kwargs_override=allow_kwargs_override, + send_task_events=send_task_events) + + def execute_target_operation(self, + operation, + kwargs=None, + allow_kwargs_override=False, + send_task_events=DEFAULT_SEND_TASK_EVENTS): + """ + Execute a node relationship target operation + + :param operation: The node relationship operation + :param kwargs: optional kwargs to be passed to the called operation + """ + return self.ctx._execute_operation( + operation, + node_instance=self.target_node_instance, + related_node_instance=self.node_instance, + operations=self.relationship.target_operations, + kwargs=kwargs, + allow_kwargs_override=allow_kwargs_override, + send_task_events=send_task_events) + + +class CloudifyWorkflowRelationship(object): + """ + A node relationship + + :param ctx: a CloudifyWorkflowContext instance + :param node: a CloudifyWorkflowNode instance + :param nodes_and_instances: a WorkflowNodesAndInstancesContainer instance + :param relationship: a relationship dict from a Node instance (of the + rest client mode) + """ + + def __init__(self, ctx, node, nodes_and_instances, relationship): + self.ctx = ctx + self.node = node + self._nodes_and_instances = nodes_and_instances + self._relationship = relationship + + @property + def target_id(self): + """The relationship target node id""" + return self._relationship.get('target_id') + + @property + def target_node(self): + """The relationship target node WorkflowContextNode instance""" + return self._nodes_and_instances.get_node(self.target_id) + + @property + def source_operations(self): + """The relationship source operations""" + return self._relationship.get('source_operations', {}) + + @property + def target_operations(self): + """The relationship target operations""" + return self._relationship.get('target_operations', {}) + + def is_derived_from(self, other_relationship): + """ + :param other_relationship: a string like + cloudify.relationships.contained_in + """ + return other_relationship in self._relationship["type_hierarchy"] + + +class CloudifyWorkflowNodeInstance(object): + """ + A plan node instance + + :param ctx: a CloudifyWorkflowContext instance + :param node: a CloudifyWorkflowContextNode instance + :param node_instance: a NodeInstance (rest client response model) + :param nodes_and_instances: a WorkflowNodesAndInstancesContainer instance + """ + + def __init__(self, ctx, node, node_instance, nodes_and_instances): + self.ctx = ctx + self._node = node + self._node_instance = node_instance + # Directly contained node instances. Filled in the context's __init__() + self._contained_instances = [] + self._relationship_instances = OrderedDict( + (relationship_instance['target_id'], + CloudifyWorkflowRelationshipInstance( + self.ctx, self, nodes_and_instances, + relationship_instance)) + for relationship_instance in node_instance.relationships) + + # adding the node instance to the node instances map + node._node_instances[self.id] = self + + self._logger = None + + def set_state(self, state): + """ + Set the node state + + :param state: The node state + :return: the state set + """ + set_state_task = self.ctx.internal.handler.get_set_state_task( + self, state) + + return self.ctx.local_task( + local_task=set_state_task, + node=self, + info=state) + + def get_state(self): + """ + Get the node state + + :return: The node state + """ + get_state_task = self.ctx.internal.handler.get_get_state_task(self) + return self.ctx.local_task( + local_task=get_state_task, + node=self) + + def send_event(self, event, additional_context=None): + """ + Sends a workflow node event to RabbitMQ + + :param event: The event + :param additional_context: additional context to be added to the + context + """ + send_event_task = self.ctx.internal.handler.get_send_node_event_task( + self, event, additional_context) + return self.ctx.local_task( + local_task=send_event_task, + node=self, + info=event) + + def execute_operation(self, + operation, + kwargs=None, + allow_kwargs_override=False, + send_task_events=DEFAULT_SEND_TASK_EVENTS): + """ + Execute a node operation + + :param operation: The node operation + :param kwargs: optional kwargs to be passed to the called operation + """ + return self.ctx._execute_operation( + operation=operation, + node_instance=self, + operations=self.node.operations, + kwargs=kwargs, + allow_kwargs_override=allow_kwargs_override, + send_task_events=send_task_events) + + @property + def id(self): + """The node instance id""" + return self._node_instance.id + + @property + def node_id(self): + """The node id (this instance is an instance of that node)""" + return self._node_instance.node_id + + @property + def relationships(self): + """The node relationships""" + return self._relationship_instances.itervalues() + + @property + def node(self): + """The node object for this node instance""" + return self._node + + @property + def modification(self): + """Modification enum (None, added, removed)""" + return self._node_instance.get('modification') + + @property + def scaling_groups(self): + return self._node_instance.get('scaling_groups', []) + + @property + def logger(self): + """A logger for this workflow node""" + if self._logger is None: + self._logger = self._init_cloudify_logger() + return self._logger + + def _init_cloudify_logger(self): + logger_name = '{0}-{1}'.format(self.ctx.execution_id, self.id) + logging_handler = self.ctx.internal.handler.get_node_logging_handler( + self) + return init_cloudify_logger(logging_handler, logger_name) + + @property + def contained_instances(self): + """ + Returns node instances directly contained in this instance (children) + """ + return self._contained_instances + + def _add_contained_node_instance(self, node_instance): + self._contained_instances.append(node_instance) + + def get_contained_subgraph(self): + """ + Returns a set containing this instance and all nodes that are + contained directly and transitively within it + """ + result = set([self]) + for child in self.contained_instances: + result.update(child.get_contained_subgraph()) + return result + + +class CloudifyWorkflowNode(object): + """ + A plan node instance + + :param ctx: a CloudifyWorkflowContext instance + :param node: a Node instance (rest client response model) + :param nodes_and_instances: a WorkflowNodesAndInstancesContainer instance + """ + + def __init__(self, ctx, node, nodes_and_instances): + self.ctx = ctx + self._node = node + self._relationships = OrderedDict( + (relationship['target_id'], CloudifyWorkflowRelationship( + self.ctx, self, nodes_and_instances, relationship)) + for relationship in node.relationships) + self._node_instances = {} + + @property + def id(self): + """The node id""" + return self._node.id + + @property + def type(self): + """The node type""" + return self._node.type + + @property + def type_hierarchy(self): + """The node type hierarchy""" + return self._node.type_hierarchy + + @property + def properties(self): + """The node properties""" + return self._node.properties + + @property + def plugins_to_install(self): + """ + The plugins to install in this node. (Only relevant for host nodes) + """ + return self._node.get('plugins_to_install', []) + + @property + def plugins(self): + """ + The plugins associated with this node + """ + return self._node.get('plugins', []) + + @property + def host_id(self): + return self._node.host_id + + @property + def host_node(self): + return self.ctx.get_node(self.host_id) + + @property + def number_of_instances(self): + return self._node.number_of_instances + + @property + def relationships(self): + """The node relationships""" + return self._relationships.itervalues() + + @property + def operations(self): + """The node operations""" + return self._node.operations + + @property + def instances(self): + """The node instances""" + return self._node_instances.itervalues() + + def get_relationship(self, target_id): + """Get a node relationship by its target id""" + return self._relationships.get(target_id) + + +class _WorkflowContextBase(object): + + def __init__(self, ctx, remote_ctx_handler_cls): + self._context = ctx = ctx or {} + self._local_task_thread_pool_size = ctx.get( + 'local_task_thread_pool_size', + DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE) + self._task_retry_interval = ctx.get('task_retry_interval', + DEFAULT_RETRY_INTERVAL) + self._task_retries = ctx.get('task_retries', + DEFAULT_TOTAL_RETRIES) + self._subgraph_retries = ctx.get('subgraph_retries', + DEFAULT_SUBGRAPH_TOTAL_RETRIES) + self._logger = None + + if self.local: + storage = ctx.pop('storage') + handler = LocalCloudifyWorkflowContextHandler(self, storage) + else: + handler = remote_ctx_handler_cls(self) + + self._internal = CloudifyWorkflowContextInternal(self, handler) + + def graph_mode(self): + """ + Switch the workflow context into graph mode + + :return: A task dependency graph instance + """ + if next(self.internal.task_graph.tasks_iter(), None) is not None: + raise RuntimeError('Cannot switch to graph mode when tasks have ' + 'already been executed') + + self.internal.graph_mode = True + return self.internal.task_graph + + @property + def bootstrap_context(self): + return self.internal._bootstrap_context + + @property + def internal(self): + return self._internal + + @property + def execution_id(self): + """The execution id""" + return self._context.get('execution_id') + + @property + def workflow_id(self): + """The workflow id""" + return self._context.get('workflow_id') + + @property + def rest_username(self): + """REST service username""" + return self._context.get('rest_username') + + @property + def rest_password(self): + """REST service password""" + return self._context.get('rest_password') + + @property + def rest_token(self): + """REST service token""" + return self._context.get('rest_token') + + @property + def local(self): + """Is the workflow running in a local or remote context""" + return self._context.get('local', False) + + @property + def logger(self): + """A logger for this workflow""" + if self._logger is None: + self._logger = self._init_cloudify_logger() + return self._logger + + def _init_cloudify_logger(self): + logger_name = self.execution_id + logging_handler = self.internal.handler.get_context_logging_handler() + return init_cloudify_logger(logging_handler, logger_name) + + def send_event(self, event, event_type='workflow_stage', + args=None, + additional_context=None): + """ + Sends a workflow event to RabbitMQ + + :param event: The event + :param event_type: The event type + :param args: additional arguments that may be added to the message + :param additional_context: additional context to be added to the + context + """ + + send_event_task = self.internal.handler.get_send_workflow_event_task( + event, event_type, args, additional_context) + return self.local_task( + local_task=send_event_task, + info=event) + + def _execute_operation(self, + operation, + node_instance, + operations, + related_node_instance=None, + kwargs=None, + allow_kwargs_override=False, + send_task_events=DEFAULT_SEND_TASK_EVENTS): + kwargs = kwargs or {} + op_struct = operations.get(operation) + if op_struct is None: + raise RuntimeError('{0} operation of node instance {1} does ' + 'not exist'.format(operation, + node_instance.id)) + if not op_struct['operation']: + return NOPLocalWorkflowTask(self) + plugin_name = op_struct['plugin'] + # could match two plugins with different executors, one is enough + # for our purposes (extract package details) + plugin = [p for p in node_instance.node.plugins + if p['name'] == plugin_name][0] + operation_mapping = op_struct['operation'] + has_intrinsic_functions = op_struct['has_intrinsic_functions'] + operation_properties = op_struct.get('inputs', {}) + operation_executor = op_struct['executor'] + operation_total_retries = op_struct['max_retries'] + operation_retry_interval = op_struct['retry_interval'] + task_name = operation_mapping + if operation_total_retries is None: + total_retries = self.internal.get_task_configuration()[ + 'total_retries'] + else: + total_retries = operation_total_retries + + node_context = { + 'node_id': node_instance.id, + 'node_name': node_instance.node_id, + 'plugin': { + 'name': plugin_name, + 'package_name': plugin.get('package_name'), + 'package_version': plugin.get('package_version') + }, + 'operation': { + 'name': operation, + 'retry_number': 0, + 'max_retries': total_retries + }, + 'has_intrinsic_functions': has_intrinsic_functions, + 'host_id': node_instance._node_instance.host_id, + 'executor': operation_executor + } + # central deployment agents run on the management worker + # so we pass the env to the dispatcher so it will be on a per + # operation basis + if operation_executor == 'central_deployment_agent': + agent_context = self.bootstrap_context.get('cloudify_agent', {}) + node_context['execution_env'] = agent_context.get('env', {}) + + if related_node_instance is not None: + relationships = [rel.target_id + for rel in node_instance.relationships] + node_context['related'] = { + 'node_id': related_node_instance.id, + 'node_name': related_node_instance.node_id, + 'is_target': related_node_instance.id in relationships + } + + final_kwargs = self._merge_dicts(merged_from=kwargs, + merged_into=operation_properties, + allow_override=allow_kwargs_override) + + return self.execute_task(task_name, + local=self.local, + kwargs=final_kwargs, + node_context=node_context, + send_task_events=send_task_events, + total_retries=total_retries, + retry_interval=operation_retry_interval) + + @staticmethod + def _merge_dicts(merged_from, merged_into, allow_override=False): + result = copy.copy(merged_into) + for key, value in merged_from.iteritems(): + if not allow_override and key in merged_into: + raise RuntimeError('Duplicate definition of {0} in operation' + ' properties and in kwargs. To allow ' + 'redefinition, pass ' + '"allow_kwargs_override" to ' + '"execute_operation"'.format(key)) + result[key] = value + return result + + def update_execution_status(self, new_status): + """ + Updates the execution status to new_status. + Note that the workflow status gets automatically updated before and + after its run (whether the run succeeded or failed) + """ + update_execution_status_task = \ + self.internal.handler.get_update_execution_status_task(new_status) + + return self.local_task( + local_task=update_execution_status_task, + info=new_status) + + def _build_cloudify_context(self, + task_id, + task_name, + node_context): + node_context = node_context or {} + context = { + '__cloudify_context': '0.3', + 'type': 'operation', + 'task_id': task_id, + 'task_name': task_name, + 'execution_id': self.execution_id, + 'workflow_id': self.workflow_id + } + context.update(node_context) + context.update(self.internal.handler.operation_cloudify_context) + return context + + def execute_task(self, + task_name, + local=True, + task_queue=None, + task_target=None, + kwargs=None, + node_context=None, + send_task_events=DEFAULT_SEND_TASK_EVENTS, + total_retries=None, + retry_interval=None): + """ + Execute a task + + :param task_name: the task named + :param kwargs: optional kwargs to be passed to the task + :param node_context: Used internally by node.execute_operation + """ + # Should deepcopy cause problems here, remove it, but please make + # sure that WORKFLOWS_WORKER_PAYLOAD is not global in manager repo + kwargs = copy.deepcopy(kwargs) or {} + task_id = str(uuid.uuid4()) + cloudify_context = self._build_cloudify_context( + task_id, + task_name, + node_context) + kwargs['__cloudify_context'] = cloudify_context + + if local: + # oh sweet circular dependency + from cloudify import dispatch + return self.local_task(local_task=dispatch.dispatch, + info=task_name, + name=task_name, + kwargs=kwargs, + task_id=task_id, + send_task_events=send_task_events, + total_retries=total_retries, + retry_interval=retry_interval) + else: + return self.remote_task(task_queue=task_queue, + task_target=task_target, + kwargs=kwargs, + cloudify_context=cloudify_context, + task_id=task_id, + send_task_events=send_task_events, + total_retries=total_retries, + retry_interval=retry_interval) + + def local_task(self, + local_task, + node=None, + info=None, + kwargs=None, + task_id=None, + name=None, + send_task_events=DEFAULT_SEND_TASK_EVENTS, + override_task_config=False, + total_retries=None, + retry_interval=None): + """ + Create a local workflow task + + :param local_task: A callable implementation for the task + :param node: A node if this task is called in a node context + :param info: Additional info that will be accessed and included + in log messages + :param kwargs: kwargs to pass to the local_task when invoked + :param task_id: The task id + """ + global_task_config = self.internal.get_task_configuration() + if hasattr(local_task, 'workflow_task_config'): + decorator_task_config = local_task.workflow_task_config + else: + decorator_task_config = {} + invocation_task_config = dict( + local_task=local_task, + node=node, + info=info, + kwargs=kwargs, + send_task_events=send_task_events, + task_id=task_id, + name=name) + if total_retries is not None: + invocation_task_config['total_retries'] = total_retries + if retry_interval is not None: + invocation_task_config['retry_interval'] = retry_interval + + final_task_config = {} + final_task_config.update(global_task_config) + if override_task_config: + final_task_config.update(decorator_task_config) + final_task_config.update(invocation_task_config) + else: + final_task_config.update(invocation_task_config) + final_task_config.update(decorator_task_config) + + return self._process_task(LocalWorkflowTask( + workflow_context=self, + **final_task_config)) + + def remote_task(self, + kwargs, + cloudify_context, + task_id, + task_queue=None, + task_target=None, + send_task_events=DEFAULT_SEND_TASK_EVENTS, + total_retries=None, + retry_interval=None): + """ + Create a remote workflow task + + :param cloudify_context: A dict for creating the CloudifyContext + used by the called task + :param task_id: The task id + """ + task_configuration = self.internal.get_task_configuration() + if total_retries is not None: + task_configuration['total_retries'] = total_retries + if retry_interval is not None: + task_configuration['retry_interval'] = retry_interval + return self._process_task( + RemoteWorkflowTask(kwargs=kwargs, + cloudify_context=cloudify_context, + task_target=task_target, + task_queue=task_queue, + workflow_context=self, + task_id=task_id, + send_task_events=send_task_events, + **task_configuration)) + + def _process_task(self, task): + if self.internal.graph_mode: + return task + else: + self.internal.task_graph.add_task(task) + return task.apply_async() + + +class WorkflowNodesAndInstancesContainer(object): + + def __init__(self, workflow_context, raw_nodes, raw_node_instances): + self._nodes = dict( + (node.id, CloudifyWorkflowNode(workflow_context, node, self)) + for node in raw_nodes) + + self._node_instances = dict( + (instance.id, CloudifyWorkflowNodeInstance( + workflow_context, self._nodes[instance.node_id], instance, + self)) + for instance in raw_node_instances) + + for inst in self._node_instances.itervalues(): + for rel in inst.relationships: + if rel.relationship.is_derived_from( + "cloudify.relationships.contained_in"): + rel.target_node_instance._add_contained_node_instance(inst) + + @property + def nodes(self): + return self._nodes.itervalues() + + @property + def node_instances(self): + return self._node_instances.itervalues() + + def get_node(self, node_id): + """ + Get a node by its id + + :param node_id: The node id + :return: a CloudifyWorkflowNode instance for the node or None if + not found + """ + return self._nodes.get(node_id) + + def get_node_instance(self, node_instance_id): + """ + Get a node instance by its id + + :param node_instance_id: The node instance id + :return: a CloudifyWorkflowNode instance for the node or None if + not found + """ + return self._node_instances.get(node_instance_id) + + +class CloudifyWorkflowContext( + _WorkflowContextBase, + WorkflowNodesAndInstancesContainer +): + """ + A context used in workflow operations + + :param ctx: a cloudify_context workflow dict + """ + + def __init__(self, ctx): + with current_workflow_ctx.push(self): + # Not using super() here, because + # WorkflowNodesAndInstancesContainer's __init__() needs some data + # to be prepared before calling it. It would be possible to + # overcome this by using kwargs + super(...).__init__() in + # _WorkflowContextBase, but the way it is now is self-explanatory. + _WorkflowContextBase.__init__(self, ctx, + RemoteCloudifyWorkflowContextHandler) + self.blueprint = context.BlueprintContext(self._context) + self.deployment = WorkflowDeploymentContext(self._context, self) + + if self.local: + storage = self.internal.handler.storage + raw_nodes = storage.get_nodes() + raw_node_instances = storage.get_node_instances() + else: + rest = get_rest_client() + raw_nodes = rest.nodes.list(self.deployment.id) + raw_node_instances = rest.node_instances.list( + self.deployment.id) + + WorkflowNodesAndInstancesContainer.__init__(self, self, raw_nodes, + raw_node_instances) + + def _build_cloudify_context(self, *args): + context = super( + CloudifyWorkflowContext, + self + )._build_cloudify_context(*args) + context.update({ + 'blueprint_id': self.blueprint.id, + 'deployment_id': self.deployment.id + }) + return context + + +class CloudifySystemWideWorkflowContext(_WorkflowContextBase): + + def __init__(self, ctx): + with current_workflow_ctx.push(self): + super(CloudifySystemWideWorkflowContext, self).__init__( + ctx, + SystemWideWfRemoteContextHandler + ) + self._dep_contexts = None + + class _ManagedCloudifyWorkflowContext(CloudifyWorkflowContext): + def __enter__(self): + self.internal.start_local_tasks_processing() + self.internal.start_event_monitor() + + def __exit__(self, *args, **kwargs): + self.internal.stop_local_tasks_processing() + self.internal.stop_event_monitor() + + @property + def deployments_contexts(self): + if self._dep_contexts is None: + self._dep_contexts = {} + + rest = get_rest_client() + for dep in rest.deployments.list(): + dep_ctx = self._context.copy() + dep_ctx['deployment_id'] = dep.id + dep_ctx['blueprint_id'] = dep.blueprint_id + + def lazily_loaded_ctx(dep_ctx): + def lazy_ctx(): + if not hasattr(lazy_ctx, '_cached_ctx'): + lazy_ctx._cached_ctx = \ + self._ManagedCloudifyWorkflowContext(dep_ctx) + return lazy_ctx._cached_ctx + + return proxy(lazy_ctx) + + self._dep_contexts[dep.id] = lazily_loaded_ctx(dep_ctx) + return self._dep_contexts + + +class CloudifyWorkflowContextInternal(object): + + def __init__(self, workflow_context, handler): + self.workflow_context = workflow_context + self.handler = handler + self._bootstrap_context = None + self._graph_mode = False + # the graph is always created internally for events to work properly + # when graph mode is turned on this instance is returned to the user. + subgraph_task_config = self.get_subgraph_task_configuration() + self._task_graph = TaskDependencyGraph( + workflow_context=workflow_context, + default_subgraph_task_config=subgraph_task_config) + + # events related + self._event_monitor = None + self._event_monitor_thread = None + + # local task processing + thread_pool_size = self.workflow_context._local_task_thread_pool_size + self.local_tasks_processor = LocalTasksProcessing( + self.workflow_context, + thread_pool_size=thread_pool_size) + + def get_task_configuration(self): + bootstrap_context = self._get_bootstrap_context() + workflows = bootstrap_context.get('workflows', {}) + total_retries = workflows.get( + 'task_retries', + self.workflow_context._task_retries) + retry_interval = workflows.get( + 'task_retry_interval', + self.workflow_context._task_retry_interval) + return dict(total_retries=total_retries, + retry_interval=retry_interval) + + def get_subgraph_task_configuration(self): + bootstrap_context = self._get_bootstrap_context() + workflows = bootstrap_context.get('workflows', {}) + subgraph_retries = workflows.get( + 'subgraph_retries', + self.workflow_context._subgraph_retries + ) + return dict(total_retries=subgraph_retries) + + def _get_bootstrap_context(self): + if self._bootstrap_context is None: + self._bootstrap_context = self.handler.bootstrap_context + return self._bootstrap_context + + @property + def task_graph(self): + return self._task_graph + + @property + def graph_mode(self): + return self._graph_mode + + @graph_mode.setter + def graph_mode(self, graph_mode): + self._graph_mode = graph_mode + + def start_event_monitor(self): + """ + Start an event monitor in its own thread for handling task events + defined in the task dependency graph + + """ + monitor = events.Monitor(self.task_graph) + thread = AMQPWrappedThread(target=monitor.capture, + name='Event-Monitor') + thread.start() + thread.started_amqp_client.get(timeout=30) + self._event_monitor = monitor + self._event_monitor_thread = thread + + def stop_event_monitor(self): + self._event_monitor.stop() + + def send_task_event(self, state, task, event=None): + send_task_event_func = self.handler.get_send_task_event_func(task) + events.send_task_event(state, task, send_task_event_func, event) + + def send_workflow_event(self, event_type, message=None, args=None): + self.handler.send_workflow_event(event_type=event_type, + message=message, + args=args) + + def start_local_tasks_processing(self): + self.local_tasks_processor.start() + + def stop_local_tasks_processing(self): + self.local_tasks_processor.stop() + + def add_local_task(self, task): + self.local_tasks_processor.add_task(task) + + +class LocalTasksProcessing(object): + + def __init__(self, workflow_ctx, thread_pool_size=1): + self._local_tasks_queue = Queue.Queue() + self._local_task_processing_pool = [] + self._is_local_context = workflow_ctx.local + for i in range(thread_pool_size): + name = 'Task-Processor-{0}'.format(i + 1) + if self._is_local_context: + thread = threading.Thread(target=self._process_local_task, + name=name, args=(workflow_ctx, )) + thread.daemon = True + else: + # this is a remote workflow, use an AMQPWrappedThread + thread = AMQPWrappedThread(target=self._process_local_task, + name=name, args=(workflow_ctx, )) + self._local_task_processing_pool.append(thread) + self.stopped = False + + def start(self): + for thread in self._local_task_processing_pool: + thread.start() + if not self._is_local_context: + for thread in self._local_task_processing_pool: + thread.started_amqp_client.get(timeout=30) + + def stop(self): + self.stopped = True + + def add_task(self, task): + self._local_tasks_queue.put(task) + + def _process_local_task(self, workflow_ctx): + # see CFY-1442 + with current_workflow_ctx.push(workflow_ctx): + while not self.stopped: + try: + task = self._local_tasks_queue.get(timeout=1) + task() + # may seem too general, but daemon threads are just great. + # anyway, this is properly unit tested, so we should be good. + except: + pass + +# Local/Remote Handlers + + +class CloudifyWorkflowContextHandler(object): + + def __init__(self, workflow_ctx): + self.workflow_ctx = workflow_ctx + + def get_context_logging_handler(self): + raise NotImplementedError('Implemented by subclasses') + + def get_node_logging_handler(self, workflow_node_instance): + raise NotImplementedError('Implemented by subclasses') + + @property + def bootstrap_context(self): + raise NotImplementedError('Implemented by subclasses') + + def get_send_task_event_func(self, task): + raise NotImplementedError('Implemented by subclasses') + + def get_update_execution_status_task(self, new_status): + raise NotImplementedError('Implemented by subclasses') + + def get_send_node_event_task(self, workflow_node_instance, + event, additional_context=None): + raise NotImplementedError('Implemented by subclasses') + + def get_send_workflow_event_task(self, event, event_type, args, + additional_context=None): + raise NotImplementedError('Implemented by subclasses') + + def get_task(self, workflow_task, queue=None, target=None): + raise NotImplementedError('Implemented by subclasses') + + @property + def operation_cloudify_context(self): + raise NotImplementedError('Implemented by subclasses') + + def get_set_state_task(self, + workflow_node_instance, + state): + raise NotImplementedError('Implemented by subclasses') + + def get_get_state_task(self, workflow_node_instance): + raise NotImplementedError('Implemented by subclasses') + + def send_workflow_event(self, event_type, message=None, args=None, + additional_context=None): + raise NotImplementedError('Implemented by subclasses') + + def download_deployment_resource(self, + resource_path, + target_path=None): + raise NotImplementedError('Implemented by subclasses') + + def start_deployment_modification(self, nodes): + raise NotImplementedError('Implemented by subclasses') + + def finish_deployment_modification(self, modification): + raise NotImplementedError('Implemented by subclasses') + + def rollback_deployment_modification(self, modification): + raise NotImplementedError('Implemented by subclasses') + + def scaling_groups(self): + raise NotImplementedError('Implemented by subclasses') + + +class RemoteContextHandler(CloudifyWorkflowContextHandler): + + @property + def bootstrap_context(self): + return get_bootstrap_context() + + def get_send_task_event_func(self, task): + return events.send_task_event_func_remote + + def get_update_execution_status_task(self, new_status): + def update_execution_status_task(): + update_execution_status(self.workflow_ctx.execution_id, new_status) + return update_execution_status_task + + def get_send_workflow_event_task(self, event, event_type, args, + additional_context=None): + @task_config(send_task_events=False) + def send_event_task(): + self.send_workflow_event(event_type=event_type, + message=event, + args=args, + additional_context=additional_context) + return send_event_task + + def get_task(self, workflow_task, queue=None, target=None): + + runtime_props = [] + + def _derive(property_name): + executor = workflow_task.cloudify_context['executor'] + host_id = workflow_task.cloudify_context['host_id'] + if executor == 'host_agent': + if len(runtime_props) == 0: + host_node_instance = get_node_instance(host_id) + cloudify_agent = host_node_instance.runtime_properties.get( + 'cloudify_agent', {}) + if property_name not in cloudify_agent: + raise exceptions.NonRecoverableError( + 'Missing cloudify_agent.{0} runtime information. ' + 'This most likely means that the Compute node was ' + 'never started successfully'.format(property_name)) + runtime_props.append(cloudify_agent) + return runtime_props[0][property_name] + else: + return 'cloudify.management' + + if queue is None: + queue = _derive('queue') + + if target is None: + target = _derive('name') + + kwargs = workflow_task.kwargs + # augment cloudify context with target and queue + kwargs['__cloudify_context']['task_queue'] = queue + kwargs['__cloudify_context']['task_target'] = target + + # Remote task + # Import here because this only applies to remote tasks execution + # environment + import celery + from cloudify_agent import app + + return celery.subtask('cloudify.dispatch.dispatch', + kwargs=kwargs, + queue=queue, + app=app.app, + immutable=True), queue, target + + @property + def operation_cloudify_context(self): + return {'local': False, + 'bypass_maintenance': utils.get_is_bypass_maintenance(), + 'rest_username': utils.get_rest_username(), + 'rest_password': utils.get_rest_password(), + 'rest_token': utils.get_rest_token()} + + def get_set_state_task(self, + workflow_node_instance, + state): + @task_config(send_task_events=False) + def set_state_task(): + node_state = get_node_instance(workflow_node_instance.id) + node_state.state = state + update_node_instance(node_state) + return node_state + return set_state_task + + def get_get_state_task(self, workflow_node_instance): + @task_config(send_task_events=False) + def get_state_task(): + return get_node_instance(workflow_node_instance.id).state + return get_state_task + + def download_deployment_resource(self, + blueprint_id, + deployment_id, + resource_path, + target_path=None): + logger = self.workflow_ctx.logger + return download_resource(blueprint_id=blueprint_id, + deployment_id=deployment_id, + resource_path=resource_path, + target_path=target_path, + logger=logger) + + +class RemoteCloudifyWorkflowContextHandler(RemoteContextHandler): + + _scaling_groups = None + + def get_node_logging_handler(self, workflow_node_instance): + return CloudifyWorkflowNodeLoggingHandler(workflow_node_instance, + out_func=logs.amqp_log_out) + + def get_context_logging_handler(self): + return CloudifyWorkflowLoggingHandler(self.workflow_ctx, + out_func=logs.amqp_log_out) + + def download_deployment_resource(self, + resource_path, + target_path=None): + return super(RemoteCloudifyWorkflowContextHandler, self) \ + .download_deployment_resource( + blueprint_id=self.workflow_ctx.blueprint.id, + deployment_id=self.workflow_ctx.deployment.id, + resource_path=resource_path, + target_path=target_path) + + def start_deployment_modification(self, nodes): + deployment_id = self.workflow_ctx.deployment.id + client = get_rest_client() + modification = client.deployment_modifications.start( + deployment_id=deployment_id, + nodes=nodes, + context={ + 'blueprint_id': self.workflow_ctx.blueprint.id, + 'deployment_id': deployment_id, + 'execution_id': self.workflow_ctx.execution_id, + 'workflow_id': self.workflow_ctx.workflow_id, + }) + return Modification(self.workflow_ctx, modification) + + def finish_deployment_modification(self, modification): + client = get_rest_client() + client.deployment_modifications.finish(modification.id) + + def rollback_deployment_modification(self, modification): + client = get_rest_client() + client.deployment_modifications.rollback(modification.id) + + def send_workflow_event(self, event_type, message=None, args=None, + additional_context=None): + send_workflow_event(self.workflow_ctx, + event_type=event_type, + message=message, + args=args, + additional_context=additional_context, + out_func=logs.amqp_event_out) + + def get_send_node_event_task(self, workflow_node_instance, + event, additional_context=None): + @task_config(send_task_events=False) + def send_event_task(): + send_workflow_node_event(ctx=workflow_node_instance, + event_type='workflow_node_event', + message=event, + additional_context=additional_context, + out_func=logs.amqp_event_out) + return send_event_task + + @property + def scaling_groups(self): + if not self._scaling_groups: + deployment_id = self.workflow_ctx.deployment.id + client = get_rest_client() + deployment = client.deployments.get( + deployment_id, _include=['scaling_groups']) + self._scaling_groups = deployment['scaling_groups'] + return self._scaling_groups + + +class SystemWideWfRemoteContextHandler(RemoteContextHandler): + + def get_context_logging_handler(self): + return SystemWideWorkflowLoggingHandler(self.workflow_ctx, + out_func=logs.amqp_log_out) + + def send_workflow_event(self, event_type, message=None, args=None, + additional_context=None): + send_sys_wide_wf_event(self.workflow_ctx, + event_type=event_type, + message=message, + args=args, + additional_context=additional_context, + out_func=logs.amqp_event_out) + + +class LocalCloudifyWorkflowContextHandler(CloudifyWorkflowContextHandler): + + def __init__(self, workflow_ctx, storage): + super(LocalCloudifyWorkflowContextHandler, self).__init__( + workflow_ctx) + self.storage = storage + self._send_task_event_func = None + + def get_context_logging_handler(self): + return CloudifyWorkflowLoggingHandler(self.workflow_ctx, + out_func=logs.stdout_log_out) + + def get_node_logging_handler(self, workflow_node_instance): + return CloudifyWorkflowNodeLoggingHandler(workflow_node_instance, + out_func=logs.stdout_log_out) + + @property + def bootstrap_context(self): + return {} + + def get_send_task_event_func(self, task): + return events.send_task_event_func_local + + def get_update_execution_status_task(self, new_status): + raise NotImplementedError( + 'Update execution status is not supported for ' + 'local workflow execution') + + def get_send_node_event_task(self, workflow_node_instance, + event, additional_context=None): + @task_config(send_task_events=False) + def send_event_task(): + send_workflow_node_event(ctx=workflow_node_instance, + event_type='workflow_node_event', + message=event, + additional_context=additional_context, + out_func=logs.stdout_event_out) + return send_event_task + + def get_send_workflow_event_task(self, event, event_type, args, + additional_context=None): + @task_config(send_task_events=False) + def send_event_task(): + send_workflow_event(ctx=self.workflow_ctx, + event_type=event_type, + message=event, + args=args, + additional_context=additional_context, + out_func=logs.stdout_event_out) + return send_event_task + + def get_task(self, workflow_task, queue=None, target=None): + raise NotImplementedError('Not implemented by local workflow tasks') + + @property + def operation_cloudify_context(self): + return {'local': True, + 'storage': self.storage} + + def get_set_state_task(self, + workflow_node_instance, + state): + @task_config(send_task_events=False) + def set_state_task(): + self.storage.update_node_instance( + workflow_node_instance.id, + state=state, + version=None) + return set_state_task + + def get_get_state_task(self, workflow_node_instance): + @task_config(send_task_events=False) + def get_state_task(): + instance = self.storage.get_node_instance( + workflow_node_instance.id) + return instance.state + return get_state_task + + def send_workflow_event(self, event_type, message=None, args=None, + additional_context=None): + send_workflow_event(self.workflow_ctx, + event_type=event_type, + message=message, + args=args, + additional_context=additional_context, + out_func=logs.stdout_event_out) + + def download_deployment_resource(self, + resource_path, + target_path=None): + return self.storage.download_resource(resource_path=resource_path, + target_path=target_path) + + @property + def scaling_groups(self): + return self.storage.plan.get('scaling_groups', {}) + + +class Modification(object): + + def __init__(self, workflow_ctx, modification): + self._raw_modification = modification + self.workflow_ctx = workflow_ctx + node_instances = modification.node_instances + added_raw_nodes = dict( + (instance.node_id, workflow_ctx.get_node(instance.node_id)._node) + for instance in node_instances.added_and_related).values() + added_raw_node_instances = node_instances.added_and_related + self._added = ModificationNodes(self, + added_raw_nodes, + added_raw_node_instances) + + removed_raw_nodes = dict( + (instance.node_id, workflow_ctx.get_node(instance.node_id)._node) + for instance in node_instances.removed_and_related).values() + removed_raw_node_instances = node_instances.removed_and_related + self._removed = ModificationNodes(self, + removed_raw_nodes, + removed_raw_node_instances) + + @property + def added(self): + """ + :return: Added and related nodes + :rtype: ModificationNodes + """ + return self._added + + @property + def removed(self): + """ + :return: Removed and related nodes + :rtype: ModificationNodes + """ + return self._removed + + @property + def id(self): + return self._raw_modification.id + + def finish(self): + """Finish deployment modification process""" + self.workflow_ctx.internal.handler.finish_deployment_modification( + self._raw_modification) + + def rollback(self): + """Rollback deployment modification process""" + self.workflow_ctx.internal.handler.rollback_deployment_modification( + self._raw_modification) + + +class ModificationNodes(WorkflowNodesAndInstancesContainer): + def __init__(self, modification, raw_nodes, raw_node_instances): + super(ModificationNodes, self).__init__( + modification.workflow_ctx, + raw_nodes, + raw_node_instances + ) + + +class WorkflowDeploymentContext(context.DeploymentContext): + + def __init__(self, cloudify_context, workflow_ctx): + super(WorkflowDeploymentContext, self).__init__(cloudify_context) + self.workflow_ctx = workflow_ctx + + def start_modification(self, nodes): + """Start deployment modification process + + :param nodes: Modified nodes specification + :return: Workflow modification wrapper + :rtype: Modification + """ + handler = self.workflow_ctx.internal.handler + return handler.start_deployment_modification(nodes) + + @property + def scaling_groups(self): + return self.workflow_ctx.internal.handler.scaling_groups + + +def task_config(fn=None, **arguments): + if fn is not None: + @functools.wraps(fn) + def wrapper(*args, **kwargs): + return fn(*args, **kwargs) + wrapper.workflow_task_config = arguments + return wrapper + else: + def partial_wrapper(func): + return task_config(func, **arguments) + return partial_wrapper
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py new file mode 100644 index 0000000..71d2d32 --- /dev/null +++ b/aria/logger.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from logging.handlers import RotatingFileHandler + +# todo: documentation... +base_logger = logging.getLogger('aria') + + +class LoggerMixin(object): + """ + Mixin Logger Class + configuration (class members): + logger_name: logger name [default: <class_name>] + logger_level: logger level [default: logging.DEBUG] + base_logger: This Mixing will create child logger from this base_logger + [default: root logger] + """ + logger_name = None + logger_level = logging.DEBUG + + def __init__(self, *args, **kwargs): + self.logger_name = self.logger_name or self.__class__.__name__ + self.logger = base_logger.getChild(self.logger_name) + self.logger.setLevel(self.logger_level) + super(LoggerMixin, self).__init__(*args, **kwargs) + + @classmethod + def with_logger( + cls, + logger_name=None, + logger_level=logging.DEBUG, + base_logger=logging.getLogger(), + **kwargs): + cls.logger_name = logger_name + cls.logger_level = logger_level + cls.base_logger = base_logger + return cls(**kwargs) + + def __getstate__(self): + obj_dict = vars(self).copy() + del obj_dict['logger'] + return obj_dict + + def __setstate__(self, obj_dict): + vars(self).update( + logger=base_logger.getChild(obj_dict['logger_name']), + **obj_dict) + + +def create_logger(logger=base_logger, handlers=(), **configs): + """ + + :param logging.Logger logger: The logger name [default: aria logger] + :param list handlers: The logger handlers + :param configs: The logger configurations + :return: logger + """ + logger.handlers = [] + for handler in handlers: + logger.addHandler(handler) + + logger.setLevel(configs.get('level', logging.DEBUG)) + logger.debug('Logger {0} configured'.format(logger.name)) + return logger + + +def create_console_log_handler(level=logging.DEBUG, formatter=None): + """ + + :param level: + :param formatter: + :return: + """ + console = logging.StreamHandler() + console.setLevel(level) + console.formatter = formatter or _DefaultConsoleFormat() + return console + + +class _DefaultConsoleFormat(logging.Formatter): + """ + _DefaultConsoleFormat class + Console logger formatter + info level logs format: '%(message)s' + every other log level are formatted: '%(levelname)s: %(message)s' + """ + def format(self, record): + try: + if record.levelno == logging.INFO: + self._fmt = '%(message)s' + else: + self._fmt = '%(levelname)s: %(message)s' + except AttributeError: + return record.message + return super(_DefaultConsoleFormat, self).format(record) + + +def create_file_log_handler( + file_path, + level=logging.DEBUG, + max_bytes=5 * 1000 * 1024, + backup_count=10, + formatter=None): + rotating_file = RotatingFileHandler( + filename=file_path, + maxBytes=max_bytes, + backupCount=backup_count, + delay=True, + ) + rotating_file.setLevel(level) + rotating_file.formatter = formatter or _default_file_formatter + return rotating_file + + +_default_file_formatter = logging.Formatter( + '%(asctime)s [%(name)s:%(levelname)s] %(message)s <%(pathname)s:%(lineno)d>') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py new file mode 100644 index 0000000..3f9aefa --- /dev/null +++ b/aria/storage/__init__.py @@ -0,0 +1,371 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Aria's storage Sub-Package +Path: aria.storage + +Storage package is a generic abstraction over different storage types. +We define this abstraction with the following components: + +1. storage: simple api to use +2. driver: implementation of the database client api. +3. model: defines the structure of the table/document. +4. field: defines a field/item in the model. + +API: + * application_storage_factory - function, default Aria storage factory. + * Storage - class, simple storage api. + * models - module, default Aria standard models. + * structures - module, default Aria structures - holds the base model, + and different fields types. + * Model - class, abstract model implementation. + * Field - class, base field implementation. + * IterField - class, base iterable field implementation. + * drivers - module, a pool of Aria standard drivers. + * StorageDriver - class, abstract model implementation. +""" +# todo: rewrite the above package documentation +# (something like explaning the two types of storage - models and resources) + +from collections import namedtuple + +from .structures import Storage, Field, Model, IterField, PointerField +from .drivers import ( + ModelDriver, + ResourceDriver, + FileSystemResourceDriver, + FileSystemModelDriver, +) +from . import models + +__all__ = ( + 'ModelStorage', + 'ResourceStorage', + 'FileSystemModelDriver', + 'models', + 'structures', + 'Field', + 'IterField', + 'PointerField', + 'Model', + 'drivers', + 'ModelDriver', + 'ResourceDriver', + 'FileSystemResourceDriver', +) +# todo: think about package output api's... +# todo: in all drivers name => entry_type +# todo: change in documentation str => basestring + + +class ModelStorage(Storage): + def __init__(self, driver, models=(), **kwargs): + """ + Simple storage client api for Aria applications. + The storage instance defines the tables/documents/code api. + + :param ModelDriver driver: model storage driver. + :param models: the models to register. + """ + assert isinstance(driver, ModelDriver) + super(ModelStorage, self).__init__(driver, models, **kwargs) + + def __getattr__(self, table): + """ + getattr is a shortcut to simple api + + for Example: + >> storage = ModelStorage(driver=FileSystemModelDriver('/tmp')) + >> node_table = storage.node + >> for node in node_table: + >> print node + + :param str table: table name to get + :return: a storage object that mapped to the table name + """ + return super(ModelStorage, self).__getattr__(table) + + def register(self, model_cls): + # todo: add documentation + model_name = generate_lower_name(model_cls) + model_api = _ModelApi(model_name, self.driver, model_cls) + self.registered[model_name] = model_api + + for pointer, pointer_schema_register in model_api.pointer_mapping.items(): + model_cls = pointer_schema_register.model_cls + self.register(model_cls) + +_Pointer = namedtuple('_Pointer', 'name, is_iter') + + +class _ModelApi(object): + def __init__(self, name, driver, model_cls): + """ + Managing the model in the storage, using the driver. + + :param basestring name: the name of the model. + :param ModelDriver driver: the driver which supports this model in the storage. + :param Model model_cls: table/document class model. + """ + assert isinstance(driver, ModelDriver) + assert issubclass(model_cls, Model) + self.name = name + self.driver = driver + self.model_cls = model_cls + self.pointer_mapping = {} + self._setup_pointers_mapping() + + def _setup_pointers_mapping(self): + for field_name, field_cls in vars(self.model_cls).items(): + if not(isinstance(field_cls, PointerField) and field_cls.type): + continue + pointer_key = _Pointer(field_name, is_iter=isinstance(field_cls, IterField)) + self.pointer_mapping[pointer_key] = self.__class__( + name=generate_lower_name(field_cls.type), + driver=self.driver, + model_cls=field_cls.type) + + def __iter__(self): + return self.iter() + + def __repr__(self): + return '{self.name}(driver={self.driver}, model={self.model_cls})'.format(self=self) + + def create(self): + """ + Creates the model in the storage. + """ + with self.driver as connection: + connection.create(self.name) + + def get(self, entry_id, **kwargs): + """ + Getter for the model from the storage. + + :param basestring entry_id: the id of the table/document. + :return: model instance + :rtype: Model + """ + with self.driver as connection: + data = connection.get( + name=self.name, + entry_id=entry_id, + **kwargs) + data.update(self._get_pointers(data, **kwargs)) + return self.model_cls(**data) + + def store(self, entry, **kwargs): + """ + Setter for the model in the storage. + + :param Model entry: the table/document to store. + """ + assert isinstance(entry, self.model_cls) + with self.driver as connection: + data = entry.fields_dict + data.update(self._store_pointers(data, **kwargs)) + connection.store( + name=self.name, + entry_id=entry.id, + entry=data, + **kwargs) + + def delete(self, entry_id, **kwargs): + """ + Delete the model from storage. + + :param basestring entry_id: id of the entity to delete from storage. + """ + entry = self.get(entry_id) + with self.driver as connection: + self._delete_pointers(entry, **kwargs) + connection.delete( + name=self.name, + entry_id=entry_id, + **kwargs) + + def iter(self, **kwargs): + """ + Generator over the entries of model in storage. + """ + with self.driver as connection: + for data in connection.iter(name=self.name, **kwargs): + data.update(self._get_pointers(data, **kwargs)) + yield self.model_cls(**data) + + def update(self, entry_id, **kwargs): + """ + Updates and entry in storage. + + :param str entry_id: the id of the table/document. + :param kwargs: the fields to update. + :return: + """ + with self.driver as connection: + connection.update( + name=self.name, + entry_id=entry_id, + **kwargs + ) + + def _get_pointers(self, data, **kwargs): + pointers = {} + for field, schema in self.pointer_mapping.items(): + if field.is_iter: + pointers[field.name] = [ + schema.get(entry_id=pointer_id, **kwargs) + for pointer_id in data[field.name] + if pointer_id] + elif data[field.name]: + pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs) + return pointers + + def _store_pointers(self, data, **kwargs): + pointers = {} + for field, model_api in self.pointer_mapping.items(): + if field.is_iter: + pointers[field.name] = [] + for iter_entity in data[field.name]: + pointers[field.name].append(iter_entity.id) + model_api.store(iter_entity, **kwargs) + else: + pointers[field.name] = data[field.name].id + model_api.store(data[field.name], **kwargs) + return pointers + + def _delete_pointers(self, entry, **kwargs): + for field, schema in self.pointer_mapping.items(): + if field.is_iter: + for iter_entry in getattr(entry, field.name): + schema.delete(iter_entry.id, **kwargs) + else: + schema.delete(getattr(entry, field.name).id, **kwargs) + + +class ResourceApi(object): + def __init__(self, driver, resource_name): + """ + Managing the resources in the storage, using the driver. + + :param ResourceDriver driver: the driver which supports this model in the storage. + :param basestring resource_name: the type of the entry this resourceAPI manages. + """ + assert isinstance(driver, ResourceDriver) + self.driver = driver + self.resource_name = resource_name + + def __repr__(self): + return '{name}(driver={self.driver}, resource={self.resource_name})'.format( + name=self.__class__.__name__, self=self) + + def create(self): + """ + Create the resource dir in the storage. + """ + with self.driver as connection: + connection.create(self.resource_name) + + def data(self, entry_id, path=None, **kwargs): + """ + Retrieve the content of a storage resource. + + :param basestring entry_id: the id of the entry. + :param basestring path: path of the resource on the storage. + :param kwargs: resources to be passed to the driver.. + :return the content of a single file: + """ + with self.driver as connection: + return connection.data( + entry_type=self.resource_name, + entry_id=entry_id, + path=path, + **kwargs) + + def download(self, entry_id, destination, path=None, **kwargs): + """ + Download a file/dir from the resource storage. + + :param basestring entry_id: the id of the entry. + :param basestring destination: the destination of the file/dir. + :param basestring path: path of the resource on the storage. + """ + with self.driver as connection: + connection.download( + entry_type=self.resource_name, + entry_id=entry_id, + destination=destination, + path=path, + **kwargs) + + def upload(self, entry_id, source, path=None, **kwargs): + """ + Upload a file/dir from the resource storage. + + :param basestring entry_id: the id of the entry. + :param basestring source: the source path of the file to upload. + :param basestring path: the destination of the file, relative to the root dir + of the resource + """ + with self.driver as connection: + connection.upload( + entry_type=self.resource_name, + entry_id=entry_id, + source=source, + path=path, + **kwargs) + + +def generate_lower_name(model_cls): + """ + Generates the name of the class from the class object. e.g. SomeClass -> some_class + :param model_cls: the class to evaluate. + :return: lower name + :rtype: basestring + """ + return ''.join( + character if character.islower() else '_{0}'.format(character.lower()) + for character in model_cls.__name__)[1:] + + +class ResourceStorage(Storage): + def __init__(self, driver, resources=(), **kwargs): + """ + Simple storage client api for Aria applications. + The storage instance defines the tables/documents/code api. + + :param ResourceDriver driver: resource storage driver + :param resources: the resources to register. + """ + assert isinstance(driver, ResourceDriver) + super(ResourceStorage, self).__init__(driver, resources, **kwargs) + + def register(self, resource): + self.registered[resource] = ResourceApi(self.driver, resource_name=resource) + + def __getattr__(self, resource): + """ + getattr is a shortcut to simple api + + for Example: + >> storage = ResourceStorage(driver=FileSystemResourceDriver('/tmp')) + >> blueprint_resources = storage.blueprint + >> blueprint_resources.download(blueprint_id, destination='~/blueprint/') + + :param str resource: resource name to download + :return: a storage object that mapped to the resource name + :rtype: ResourceApi + """ + return super(ResourceStorage, self).__getattr__(resource)