extended error msg

Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5af1a8f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5af1a8f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5af1a8f2

Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: 5af1a8f23237768516af4cca018cd418a9abe7b1
Parents: e6d68d7
Author: mxmrlv <mxm...@gmail.com>
Authored: Thu Feb 16 14:49:19 2017 +0200
Committer: mxmrlv <mxm...@gmail.com>
Committed: Thu Feb 16 16:24:00 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  8 +---
 aria/storage/instrumentation.py                 | 40 +++++++++++------
 aria/storage/modeling/instance_elements.py      |  5 ++-
 aria/storage_initializer.py                     |  1 -
 tests/mock/models.py                            |  2 -
 ...process_executor_concurrent_modifications.py | 45 +++++++++++---------
 tests/storage/test_structures.py                |  1 -
 7 files changed, 58 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 655d75d..84f5f58 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -227,6 +227,8 @@ class ProcessExecutor(base.BaseExecutor):
         try:
             self._apply_tracked_changes(task, request)
         except BaseException as e:
+            e.message = \
+                '{0} Remote task execution failed due: {1}'.format(str(e), 
request['exception'])
             self._task_failed(task, exception=e)
         else:
             self._task_failed(task, exception=request['exception'])
@@ -364,9 +366,7 @@ def _main():
     # This is required for the instrumentation work properly.
     # See docstring of `remove_mutable_association_listener` for further 
details
     storage_type.remove_mutable_association_listener()
-
     with instrumentation.track_changes() as instrument:
-        # import pydevd; pydevd.settrace('localhost')
         try:
             ctx = 
context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
             _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
@@ -377,10 +377,6 @@ def _main():
             task_func(ctx=ctx, **operation_inputs)
             messenger.succeeded(tracked_changes=instrument.tracked_changes)
         except BaseException as e:
-            # import traceback
-            # with open('/home/maxim/Desktop/tmp_log', 'wr+') as f:
-            #     traceback.print_exc(file=f)
-
             messenger.failed(exception=e, 
tracked_changes=instrument.tracked_changes)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 41818b6..b3ca24a 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import copy
+import json
 
 import sqlalchemy
 import sqlalchemy.event
@@ -22,7 +23,7 @@ from . import exceptions
 
 from .modeling import model as _model
 
-_VERSION_ID_COL = 'version_id'
+_VERSION_ID_COL = 'version'
 _STUB = object()
 _INSTRUMENTED = {
     _model.Node.runtime_properties: dict
@@ -162,18 +163,31 @@ def apply_tracked_changes(tracked_changes, model):
                             returned by calling ``track_changes()``
     :param model: The model storage used to actually apply the changes
     """
-    for mapi_name, tracked_instances in tracked_changes.items():
-        mapi = getattr(model, mapi_name)
-        for instance_id, tracked_attributes in tracked_instances.items():
-            instance = None
-            for attribute_name, value in tracked_attributes.items():
-                if value.initial != value.current:
-                    if not instance:
-                        instance = mapi.get(instance_id)
-                    setattr(instance, attribute_name, value.current)
-            if instance:
-                _validate_version_id(instance, mapi)
-                mapi.update(instance)
+    successfully_updated_instances = dict()
+    try:
+        for mapi_name, tracked_instances in tracked_changes.items():
+            successfully_updated_instances[mapi_name] = list()
+            mapi = getattr(model, mapi_name)
+            for instance_id, tracked_attributes in tracked_instances.items():
+                instance = None
+                for attribute_name, value in tracked_attributes.items():
+                    if value.initial != value.current:
+                        if not instance:
+                            instance = mapi.get(instance_id)
+                        setattr(instance, attribute_name, value.current)
+                if instance:
+                    _validate_version_id(instance, mapi)
+                    mapi.update(instance)
+                    
successfully_updated_instances[mapi_name].append(instance_id)
+    except BaseException as e:
+        for key, value in successfully_updated_instances.items():
+            if not value:
+                del successfully_updated_instances[key]
+        e.message = \
+            'Registering all the changes to the storage has failed. ' \
+            'The instances that were successfully updated : {0} .' \
+            'This was caused by 
{1}.'.format(json.dumps(successfully_updated_instances), str(e))
+        raise e
 
 
 def _validate_version_id(instance, mapi):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/storage/modeling/instance_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/instance_elements.py 
b/aria/storage/modeling/instance_elements.py
index 0666c8a..2b102f1 100644
--- a/aria/storage/modeling/instance_elements.py
+++ b/aria/storage/modeling/instance_elements.py
@@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin):
 
     # region many-to-one relationships
     @declared_attr
-    def service_instnce(cls):
+    def service_instance(cls):
         return cls.many_to_one_relationship('service_instance')
 
     # region many-to-many relationships
@@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin):
     * :code:`relationships`: List of :class:`Relationship`
     """
     __tablename__ = 'node'
+    version = Column(Integer, nullable=False)
+    __mapper_args__ = {'version_id_col': version}
 
     __private_fields__ = ['service_instance_fk',
                           'host_fk',
@@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin):
     runtime_properties = Column(aria_types.Dict)
     scaling_groups = Column(aria_types.List)
     state = Column(Text, nullable=False)
-    version = Column(Integer, default=1)
 
     @declared_attr
     def plugins(cls):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/aria/storage_initializer.py
----------------------------------------------------------------------
diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py
index aea5ec8..175ec22 100644
--- a/aria/storage_initializer.py
+++ b/aria/storage_initializer.py
@@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model):
         service_instance=service_instance,
         name=node_model.id,
         runtime_properties={},
-        version=None,
         node_template=node,
         state='',
         scaling_groups=[]

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 047526a..301fc01 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment):
         name=DEPENDENCY_NODE_INSTANCE_NAME,
         service_instance=deployment,
         runtime_properties={'ip': '1.1.1.1'},
-        version=None,
         node_template=dependency_node,
         state='',
         scaling_groups=[]
@@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment):
         name=DEPENDENT_NODE_INSTANCE_NAME,
         service_instance=deployment,
         runtime_properties={},
-        version=None,
         node_template=dependent_node,
         state='',
         scaling_groups=[],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index e46921e..40deedd 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -38,7 +38,7 @@ def test_concurrent_modification_on_task_succeeded(context, 
executor, lock_files
 
 @operation
 def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
-    _concurrent_update(lock_files, ctx.node_instance, key, first_value, 
second_value)
+    _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
 
 
 def test_concurrent_modification_on_task_failed(context, executor, lock_files):
@@ -47,7 +47,7 @@ def test_concurrent_modification_on_task_failed(context, 
executor, lock_files):
 
 @operation
 def _test_task_failed(ctx, lock_files, key, first_value, second_value):
-    first = _concurrent_update(lock_files, ctx.node_instance, key, 
first_value, second_value)
+    first = _concurrent_update(lock_files, ctx.node, key, first_value, 
second_value)
     if not first:
         raise RuntimeError('MESSAGE')
 
@@ -58,21 +58,23 @@ def 
test_concurrent_modification_on_update_and_refresh(context, executor, lock_f
 
 @operation
 def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
-    node_instance = ctx.node_instance
-    first = _concurrent_update(lock_files, node_instance, key, first_value, 
second_value)
+    node = ctx.node
+    first = _concurrent_update(lock_files, node, key, first_value, 
second_value)
     if not first:
         try:
-            ctx.model.node_instance.update(node_instance)
+            ctx.model.node.update(node)
         except StorageError as e:
             assert 'Version conflict' in str(e)
-            ctx.model.node_instance.refresh(node_instance)
+            ctx.model.node.refresh(node)
         else:
             raise RuntimeError('Unexpected')
 
 
 def _test(context, executor, lock_files, func, expected_failure):
-    def _node_instance(ctx):
-        return 
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+    def _node(ctx):
+        return 
ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+
+    op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
 
     key = 'key'
     first_value = 'value1'
@@ -84,14 +86,19 @@ def _test(context, executor, lock_files, func, 
expected_failure):
         'second_value': second_value
     }
 
+    node = _node(context)
+    node.interfaces = [mock.models.get_interface(
+        op_name,
+        operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, 
func.__name__))
+    )]
+    context.model.node.update(node)
+
     @workflow
-    def mock_workflow(ctx, graph):
-        op = 'test.op'
-        node_instance = _node_instance(ctx)
-        node_instance.node.operations[op] = {'operation': 
'{0}.{1}'.format(__name__, func.__name__)}
+    def mock_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.node_instance(instance=node_instance, 
name=op, inputs=inputs),
-            api.task.OperationTask.node_instance(instance=node_instance, 
name=op, inputs=inputs))
+            api.task.OperationTask.node(instance=node, name=op_name, 
inputs=inputs),
+            api.task.OperationTask.node(instance=node, name=op_name, 
inputs=inputs)
+        )
 
     signal = events.on_failure_task_signal
     with events_collector(signal) as collected:
@@ -100,7 +107,7 @@ def _test(context, executor, lock_files, func, 
expected_failure):
         except ExecutorException:
             pass
 
-    props = _node_instance(context).runtime_properties
+    props = _node(context).runtime_properties
     assert props[key] == first_value
 
     exceptions = [event['kwargs']['exception'] for event in 
collected.get(signal, [])]
@@ -122,7 +129,7 @@ def executor():
 
 @pytest.fixture
 def context(tmpdir):
-    result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)))
+    result = mock.context.simple(str(tmpdir))
     yield result
     storage.release_sqlite_storage(result.model)
 
@@ -132,7 +139,7 @@ def lock_files(tmpdir):
     return str(tmpdir.join('first_lock_file')), 
str(tmpdir.join('second_lock_file'))
 
 
-def _concurrent_update(lock_files, node_instance, key, first_value, 
second_value):
+def _concurrent_update(lock_files, node, key, first_value, second_value):
 
     locker1 = fasteners.InterProcessLock(lock_files[0])
     locker2 = fasteners.InterProcessLock(lock_files[1])
@@ -143,11 +150,11 @@ def _concurrent_update(lock_files, node_instance, key, 
first_value, second_value
         # Give chance for both processes to acquire locks
         while locker2.acquire(blocking=False):
             locker2.release()
-            time.sleep(0.1)
+            time.sleep(0.01)
     else:
         locker2.acquire()
 
-    node_instance.runtime_properties[key] = first_value if first else 
second_value
+    node.runtime_properties[key] = first_value if first else second_value
 
     if first:
         locker1.release()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5af1a8f2/tests/storage/test_structures.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py
index 30f0064..666256e 100644
--- a/tests/storage/test_structures.py
+++ b/tests/storage/test_structures.py
@@ -125,7 +125,6 @@ def test_relationship_model_ordering(context):
         name='new_node_instance',
         runtime_properties={},
         service_instance=service_instance,
-        version=None,
         node_template=new_node_template,
         state='',
         scaling_groups=[]

Reply via email to