Repository: incubator-ariatosca
Updated Branches:
  refs/heads/wf-executor be147d967 -> a9120175e


some order


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a9120175
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a9120175
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a9120175

Branch: refs/heads/wf-executor
Commit: a9120175efb9f62301a6bb544b4aacc0d86fb094
Parents: be147d9
Author: Dan Kilman <dankil...@gmail.com>
Authored: Tue Oct 18 15:44:21 2016 +0300
Committer: Dan Kilman <dankil...@gmail.com>
Committed: Tue Oct 18 15:44:21 2016 +0300

----------------------------------------------------------------------
 aria/events/__init__.py                      |  12 +-
 aria/events/builtin_event_handlers.py        |  72 ++++++++++-
 aria/events/workflow_engine_event_handler.py |  70 +++++------
 aria/tools/module.py                         |  29 +++++
 aria/workflows/engine/engine.py              | 145 +++-------------------
 aria/workflows/engine/executor.py            | 134 +++++++++-----------
 6 files changed, 205 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index 70e7e03..c9d7b20 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -20,25 +20,15 @@ from blinker import signal
 from ..tools.plugin import plugin_installer
 
 
-# workflow engine default signals:
+# workflow engine task signals:
 start_task_signal = signal('start_task_signal')
-end_task_signal = signal('end_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')
 
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
-end_workflow_signal = signal('end_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')
-start_sub_workflow_signal = signal('start_sub_workflow_signal')
-end_sub_workflow_signal = signal('end_sub_workflow_signal')
-
-# workflow engine operation signals:
-start_operation_signal = signal('start_operation_signal')
-end_operation_signal = signal('end_operation_signal')
-on_success_operation_signal = signal('on_success_operation_signal')
-on_failure_operation_signal = signal('on_failure_operation_signal')
 
 plugin_installer(
     path=os.path.dirname(os.path.realpath(__file__)),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/events/builtin_event_handlers.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handlers.py 
b/aria/events/builtin_event_handlers.py
index 59f59c1..6be39c3 100644
--- a/aria/events/builtin_event_handlers.py
+++ b/aria/events/builtin_event_handlers.py
@@ -13,8 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from datetime import datetime
+
 from ..storage.models import NodeInstance
-from . import start_operation_signal
+from . import (
+    start_workflow_signal,
+    on_success_workflow_signal,
+    on_failure_workflow_signal,
+    start_task_signal,
+    on_success_task_signal,
+    on_failure_task_signal,
+)
 
 
 class _OperationToNodeInstanceState(dict):
@@ -33,7 +42,7 @@ _operation_to_node_instance_state = 
_OperationToNodeInstanceState({
 })
 
 
-@start_operation_signal.connect
+@start_task_signal.connect
 def _update_node_instance_state(sender, **kwargs):
     try:
         next_state = _operation_to_node_instance_state[sender.task_name]
@@ -42,3 +51,62 @@ def _update_node_instance_state(sender, **kwargs):
     node_instance = sender.context.node_instance
     node_instance.state = next_state
     sender.context.storage.node_instance.store(node_instance)
+
+
+@start_task_signal.connect
+def _task_started(task, *args, **kwargs):
+    operation_context = task.operation_context
+    operation = operation_context.operation
+    operation.started_at = datetime.utcnow()
+    operation.status = operation.STARTED
+    operation_context.operation = operation
+
+
+@on_failure_task_signal.connect
+def _task_failed(task, *args, **kwargs):
+    operation_context = task.operation_context
+    operation = operation_context.operation
+    operation.ended_at = datetime.utcnow()
+    operation.status = operation.FAILED
+    operation_context.operation = operation
+
+
+@on_success_task_signal.connect
+def _task_succeeded(task, *args, **kwargs):
+    operation_context = task.operation_context
+    operation = operation_context.operation
+    operation.ended_at = datetime.utcnow()
+    operation.status = operation.SUCCESS
+    operation_context.operation = operation
+
+
+@start_workflow_signal.connect
+def _workflow_started(workflow_context, *args, **kwargs):
+    Execution = workflow_context.storage.execution.model_cls
+    execution = Execution(
+        id=workflow_context.execution_id,
+        deployment_id=workflow_context.deployment_id,
+        workflow_id=workflow_context.workflow_id,
+        blueprint_id=workflow_context.blueprint_id,
+        status=Execution.PENDING,
+        created_at=datetime.utcnow(),
+        error='',
+        parameters=workflow_context.parameters,
+        is_system_workflow=False
+    )
+    workflow_context.execution = execution
+
+
+@on_failure_workflow_signal.connect
+def _workflow_failed(workflow_context, exception, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.error = str(exception)
+    execution.status = execution.FAILED
+    workflow_context.execution = execution
+
+
+@on_success_workflow_signal.connect
+def _workflow_succeeded(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.status = execution.TERMINATED
+    workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/workflow_engine_event_handler.py 
b/aria/events/workflow_engine_event_handler.py
index 59bed99..44759c5 100644
--- a/aria/events/workflow_engine_event_handler.py
+++ b/aria/events/workflow_engine_event_handler.py
@@ -14,61 +14,47 @@
 # limitations under the License.
 
 from . import (
-    start_operation_signal,
-    end_operation_signal,
-    on_success_operation_signal,
-    on_failure_operation_signal,
+    start_task_signal,
+    on_success_task_signal,
+    on_failure_task_signal,
     start_workflow_signal,
-    end_workflow_signal,
-    start_sub_workflow_signal,
-    end_sub_workflow_signal,
+    on_success_workflow_signal,
+    on_failure_workflow_signal
 )
 
 
-@start_operation_signal.connect
-def start_operation_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - starting operation: {sender.task_name}'.format(sender=sender))
+@start_task_signal.connect
+def start_task_handler(task, **kwargs):
+    task.context.logger.debug(
+        'Event: Starting task: {task.context.name}'.format(task=task))
 
 
-@end_operation_signal.connect
-def end_operation_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - finished operation: {sender.task_name}'.format(sender=sender))
+@on_success_task_signal.connect
+def success_task_handler(task, **kwargs):
+    task.context.logger.debug(
+        'Event: Task success: {task.context.name}'.format(task=task))
 
 
-@on_success_operation_signal.connect
-def success_operation_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - operation success: {sender.task_name}'.format(sender=sender))
-
-
-@on_failure_operation_signal.connect
-def failure_operation_handler(sender, **kwargs):
-    sender.context.logger.error(
-        'Event - operation failure: {sender.task_name}'.format(sender=sender),
+@on_failure_task_signal.connect
+def failure_operation_handler(task, **kwargs):
+    task.context.logger.error(
+        'Event: Task failure: {task.context.id}'.format(task=task),
         exc_info=kwargs.get('exception', True))
 
 
 @start_workflow_signal.connect
-def start_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - starting workflow: {sender.task_name}'.format(sender=sender))
-
-
-@end_workflow_signal.connect
-def end_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - finished workflow: {sender.task_name}'.format(sender=sender))
+def start_workflow_handler(context, **kwargs):
+    context.logger.debug(
+        'Event: Starting workflow: {context.name}'.format(context=context))
 
 
-@start_sub_workflow_signal.connect
-def start_sub_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - starting sub workflow: 
{sender.task_name}'.format(sender=sender))
+@on_failure_workflow_signal.connect
+def failure_workflow_handler(context, **kwargs):
+    context.logger.debug(
+        'Event: Workflow failure: {context.name}'.format(context=context))
 
 
-@end_sub_workflow_signal.connect
-def end_sub_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - finished sub workflow: 
{sender.task_name}'.format(sender=sender))
+@on_success_workflow_signal.connect
+def success_workflow_handler(context, **kwargs):
+    context.logger.debug(
+        'Event: Workflow success: {context.name}'.format(context=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/tools/module.py
----------------------------------------------------------------------
diff --git a/aria/tools/module.py b/aria/tools/module.py
new file mode 100644
index 0000000..535f7aa
--- /dev/null
+++ b/aria/tools/module.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import importlib
+
+
+def load_attribute(attribute_path):
+    module_name, attribute_name = attribute_path.rsplit('.', 1)
+    try:
+        module = importlib.import_module(module_name)
+        return getattr(module, attribute_name)
+    except ImportError:
+        # TODO: handle
+        raise
+    except AttributeError:
+        # TODO: handle
+        raise

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/workflows/engine/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py
index 7b86fb5..2b0507f 100644
--- a/aria/workflows/engine/engine.py
+++ b/aria/workflows/engine/engine.py
@@ -14,29 +14,20 @@
 # limitations under the License.
 
 import time
-from datetime import datetime
 
-from contextlib import contextmanager
-from networkx import DiGraph
+import networkx
 
-from aria.events import (
-    start_workflow_signal,
-    on_success_workflow_signal,
-    on_failure_workflow_signal,
-    start_task_signal,
-    on_success_task_signal,
-    on_failure_task_signal,
-)
-from aria.logger import LoggerMixin
+from aria import events
+from aria import logger
 
 
-class Engine(LoggerMixin):
+class Engine(logger.LoggerMixin):
 
     def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._workflow_context = workflow_context
         self._tasks_graph = tasks_graph
-        self._execution_graph = DiGraph()
+        self._execution_graph = networkx.DiGraph()
         self._executor = executor
         self._build_execution_graph(self._workflow_context, self._tasks_graph)
 
@@ -44,23 +35,21 @@ class Engine(LoggerMixin):
         pass
 
     def execute(self):
-        execution_id = self._workflow_context.execution_id
-        with self._connect_signals():
-            try:
-                start_workflow_signal.send(self, execution_id=execution_id)
-                while True:
-                    for task in self._ended_tasks():
-                        self._handle_ended_tasks(task)
-                    for task in self._executable_tasks():
-                        self._handle_executable_task(task)
-                    if self._all_tasks_consumed():
-                        break
-                    else:
-                        time.sleep(0.1)
-                on_success_workflow_signal.send(self, 
execution_id=execution_id)
-            except BaseException as e:
-                on_failure_workflow_signal.send(self, 
execution_id=execution_id, exception=e)
-                raise
+        try:
+            events.start_workflow_signal.send(self._workflow_context)
+            while True:
+                for task in self._ended_tasks():
+                    self._handle_ended_tasks(task)
+                for task in self._executable_tasks():
+                    self._handle_executable_task(task)
+                if self._all_tasks_consumed():
+                    break
+                else:
+                    time.sleep(0.1)
+            events.on_success_workflow_signal.send(self._workflow_context)
+        except BaseException as e:
+            events.on_failure_workflow_signal.send(self._workflow_context, 
exception=e)
+            raise
 
     def _executable_tasks(self):
         now = time.time()
@@ -81,9 +70,6 @@ class Engine(LoggerMixin):
     def _tasks_iter(self):
         return (data['task'] for _, data in 
self._execution_graph.nodes_iter(data=True))
 
-    def _get_task(self, task_id):
-        return self._execution_graph.node[task_id]['task']
-
     def _handle_executable_task(self, task):
         self._executor.execute(task)
 
@@ -92,94 +78,3 @@ class Engine(LoggerMixin):
             raise RuntimeError('Workflow failed')
         else:
             self._execution_graph.remove_node(task.id)
-
-    def _task_started_receiver(self, task_id, *args, **kwargs):
-        task = self._get_task(task_id)
-        operation_context = task.operation_context
-        operation = operation_context.operation
-        operation.started_at = datetime.utcnow()
-        operation.status = operation.STARTED
-        operation_context.operation = operation
-
-    def _task_failed_receiver(self, task_id, *args, **kwargs):
-        task = self._get_task(task_id)
-        operation_context = task.operation_context
-        operation = operation_context.operation
-        operation.ended_at = datetime.utcnow()
-        operation.status = operation.FAILED
-        operation_context.operation = operation
-
-    def _task_succeeded_receiver(self, task_id, *args, **kwargs):
-        task = self._get_task(task_id)
-        operation_context = task.operation_context
-        operation = operation_context.operation
-        operation.ended_at = datetime.utcnow()
-        operation.status = operation.SUCCESS
-        operation_context.operation = operation
-
-    def _start_workflow_receiver(self, *args, **kwargs):
-        Execution = self._workflow_context.storage.execution.model_cls
-        execution = Execution(
-            id=self._workflow_context.execution_id,
-            deployment_id=self._workflow_context.deployment_id,
-            workflow_id=self._workflow_context.workflow_id,
-            blueprint_id=self._workflow_context.blueprint_id,
-            status=Execution.PENDING,
-            created_at=datetime.utcnow(),
-            error='',
-            parameters=self._workflow_context.parameters,
-            is_system_workflow=False
-        )
-        self._workflow_context.execution = execution
-
-    def _workflow_failed_receiver(self, exception, *args, **kwargs):
-        execution = self._workflow_context.execution
-        execution.error = str(exception)
-        execution.status = execution.FAILED
-        self._workflow_context.execution = execution
-
-    def _workflow_succeeded_receiver(self, *args, **kwargs):
-        execution = self._workflow_context.execution
-        execution.status = execution.TERMINATED
-        self._workflow_context.execution = execution
-
-    @contextmanager
-    def _connect_signals(self):
-        start_workflow_signal.connect(self._start_workflow_receiver)
-        on_success_workflow_signal.connect(self._workflow_succeeded_receiver)
-        on_failure_workflow_signal.connect(self._workflow_failed_receiver)
-        start_task_signal.connect(self._task_started_receiver)
-        on_success_task_signal.connect(self._task_succeeded_receiver)
-        on_failure_task_signal.connect(self._task_failed_receiver)
-        try:
-            yield
-        finally:
-            start_workflow_signal.disconnect(self._start_workflow_receiver)
-            
on_success_workflow_signal.disconnect(self._workflow_succeeded_receiver)
-            
on_failure_workflow_signal.disconnect(self._workflow_failed_receiver)
-            start_task_signal.disconnect(self._task_started_receiver)
-            on_success_task_signal.disconnect(self._task_succeeded_receiver)
-            on_failure_task_signal.disconnect(self._task_failed_receiver)
-
-
-class Task(object):
-
-    def __init__(self, operation_context):
-        self.operation_context = operation_context
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        Operation = self.operation_context.storage.operation.model_cls
-        operation = Operation(
-            id=self.operation_context.id,
-            execution_id=self.operation_context.execution_id,
-            max_retries=self.operation_context.parameters.get('max_retries', 
1),
-            status=Operation.PENDING,
-        )
-        self.operation_context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.operation_context, attr)
-        except AttributeError:
-            return super(Task, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/workflows/engine/executor.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/executor.py 
b/aria/workflows/engine/executor.py
index a69e87a..13ccd07 100644
--- a/aria/workflows/engine/executor.py
+++ b/aria/workflows/engine/executor.py
@@ -16,16 +16,12 @@
 import threading
 import multiprocessing
 import Queue
-from importlib import import_module
-from collections import namedtuple
+import collections
 
 import jsonpickle
 
-from aria.events import (
-    start_task_signal,
-    on_success_task_signal,
-    on_failure_task_signal,
-)
+from aria import events
+from aria.tools import module
 
 
 class Executor(object):
@@ -33,97 +29,90 @@ class Executor(object):
     def execute(self, task):
         raise NotImplementedError
 
-    def task_started(self, task_id):
-        start_task_signal.send(self, task_id=task_id)
+    def _task_started(self, task):
+        events.start_task_signal.send(task)
 
-    def task_failed(self, task_id, exception):
-        on_failure_task_signal.send(self, task_id=task_id, exception=exception)
+    def _task_failed(self, task, exception):
+        events.on_failure_task_signal.send(task, exception=exception)
 
-    def task_succeeded(self, task_id):
-        on_success_task_signal.send(self, task_id=task_id)
+    def _task_succeeded(self, task):
+        events.on_success_task_signal.send(task)
 
 
 class ThreadExecutor(Executor):
 
     def __init__(self, pool_size=1):
-        self.stopped = False
-        self.queue = Queue.Queue()
-        self.pool = []
+        self._stopped = False
+        self._queue = Queue.Queue()
+        self._pool = []
         for i in range(pool_size):
             name = 'LocalThreadExecutor-{index}'.format(index=i+1)
             thread = threading.Thread(target=self._processor, name=name)
             thread.daemon = True
             thread.start()
-            self.pool.append(thread)
+            self._pool.append(thread)
 
     def execute(self, task):
-        self.queue.put(task)
+        self._queue.put(task)
 
     def close(self):
-        self.stopped = True
+        self._stopped = True
 
     def _processor(self):
-        while not self.stopped:
+        while not self._stopped:
             try:
-                task = self.queue.get(timeout=1)
-                self.task_started(task.id)
+                task = self._queue.get(timeout=1)
+                self._task_started(task)
                 try:
                     operation_context = task.operation_context
-                    task_func = 
self._load_task(operation_context.operation_details['operation'])
+                    task_func = module.load_attribute(
+                        operation_context.operation_details['operation'])
                     task_func(**operation_context.inputs)
-                    self.task_succeeded(task.id)
+                    self._task_succeeded(task)
                 except BaseException as e:
-                    self.task_failed(task.id, exception=e)
+                    self._task_failed(task, exception=e)
             # Daemon threads
             except:
                 pass
 
-    def _load_task(self, handler_path):
-        module_name, spec_handler_name = handler_path.rsplit('.', 1)
-        try:
-            module = import_module(module_name)
-            return getattr(module, spec_handler_name)
-        except ImportError:
-            # TODO: handle
-            raise
-        except AttributeError:
-            # TODO: handle
-            raise
-
 
 class MultiprocessExecutor(Executor):
 
-    Action = namedtuple('Action', 'name task_id exception')
+    _Message = collections.namedtuple('Message', 'name task_id exception')
 
     def __init__(self, pool_size=1):
-        self.stopped = False
-        self.listener = threading.Thread(target=self._listener)
-        self.listener.daemon = True
-        self.listener.start()
-        self.queue = multiprocessing.Queue()
-        self.pool = multiprocessing.Pool(processes=pool_size)
+        self._stopped = False
+        self._listener = threading.Thread(target=self._listener)
+        self._listener.daemon = True
+        self._listener.start()
+        self._queue = multiprocessing.Queue()
+        self._pool = multiprocessing.Pool(processes=pool_size,
+                                          maxtasksperchild=1)
+        self._tasks = {}
 
     def execute(self, task):
-        self.pool.apply_async(self._process, args=(
-            self.queue,
+        self._tasks[task.id] = task
+        self._pool.apply_async(self._process, args=(
+            self._queue,
             task.id,
             task.operation_context.operation_details,
             task.operation_context.inputs))
 
     def close(self):
-        self.pool.close()
-        self.stopped = True
+        self._pool.close()
+        self._stopped = True
 
     def _listener(self):
-        while not self.stopped:
+        while not self._stopped:
             try:
-                action = self.queue.get(timeout=1)
-                if action.name == 'task_started':
-                    self.task_started(action.task_id)
-                elif action.name == 'task_succeeded':
-                    self.task_succeeded(action.task_id)
-                elif action.name == 'task_failed':
-                    self.task_failed(action.task_id, 
exception=jsonpickle.loads(action.exception))
+                message = self._queue.get(timeout=1)
+                if message.type == 'task_started':
+                    self._task_started(self._tasks[message.task_id])
+                elif message.type == 'task_succeeded':
+                    self._task_succeeded(self._remove_task(message.task_id))
+                elif message.type == 'task_failed':
+                    self._task_failed(self._remove_task(message.task_id),
+                                      
exception=jsonpickle.loads(message.exception))
                 else:
                     # TODO: something
                     raise RuntimeError()
@@ -132,28 +121,19 @@ class MultiprocessExecutor(Executor):
                 pass
 
     def _process(self, queue, task_id, operation_details, operation_inputs):
-        queue.put(self.Action(action='task_started',
-                              task_id=task_id,
-                              exception=None))
+        queue.put(self._Message(type='task_started',
+                                task_id=task_id,
+                                exception=None))
         try:
-            task_func = self._load_task(operation_details['operation'])
+            task_func = module.load_attribute(operation_details['operation'])
             task_func(**operation_inputs)
-            queue.put(self.Action(action='task_succeeded',
-                                  task_id=task_id,
-                                  exception=None))
+            queue.put(self._Message(type='task_succeeded',
+                                    task_id=task_id,
+                                    exception=None))
         except BaseException as e:
-            queue.put(self.Action(action='task_failed',
-                                  task_id=task_id,
-                                  exception=jsonpickle.dumps(e)))
+            queue.put(self._Message(type='task_failed',
+                                    task_id=task_id,
+                                    exception=jsonpickle.dumps(e)))
 
-    def _load_task(self, handler_path):
-        module_name, spec_handler_name = handler_path.rsplit('.', 1)
-        try:
-            module = import_module(module_name)
-            return getattr(module, spec_handler_name)
-        except ImportError:
-            # TODO: handle
-            raise
-        except AttributeError:
-            # TODO: handle
-            raise
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)

Reply via email to