Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-79-concurrent-storage-modifications 5af1a8f23 -> ab138d4d7 
(forced update)


tests fix and exception msg


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ab138d4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ab138d4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ab138d4d

Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: ab138d4d7d82fec44288f96bc8dca9f2c7248b80
Parents: 4a72e11
Author: mxmrlv <mxm...@gmail.com>
Authored: Thu Feb 16 13:06:35 2017 +0200
Committer: mxmrlv <mxm...@gmail.com>
Committed: Thu Feb 16 16:36:47 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |   3 +-
 aria/storage/instrumentation.py                 |  40 ++++---
 aria/storage/modeling/instance_elements.py      |   5 +-
 aria/storage_initializer.py                     |   1 -
 tests/mock/models.py                            |   2 -
 ...process_executor_concurrent_modifications.py | 109 +++++++++----------
 tests/storage/test_structures.py                |   1 -
 7 files changed, 82 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index a23e3da..84f5f58 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -227,6 +227,8 @@ class ProcessExecutor(base.BaseExecutor):
         try:
             self._apply_tracked_changes(task, request)
         except BaseException as e:
+            e.message = \
+                '{0} Remote task execution failed due: {1}'.format(str(e), 
request['exception'])
             self._task_failed(task, exception=e)
         else:
             self._task_failed(task, exception=request['exception'])
@@ -364,7 +366,6 @@ def _main():
     # This is required for the instrumentation work properly.
     # See docstring of `remove_mutable_association_listener` for further 
details
     storage_type.remove_mutable_association_listener()
-
     with instrumentation.track_changes() as instrument:
         try:
             ctx = 
context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 41818b6..b3ca24a 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import copy
+import json
 
 import sqlalchemy
 import sqlalchemy.event
@@ -22,7 +23,7 @@ from . import exceptions
 
 from .modeling import model as _model
 
-_VERSION_ID_COL = 'version_id'
+_VERSION_ID_COL = 'version'
 _STUB = object()
 _INSTRUMENTED = {
     _model.Node.runtime_properties: dict
@@ -162,18 +163,31 @@ def apply_tracked_changes(tracked_changes, model):
                             returned by calling ``track_changes()``
     :param model: The model storage used to actually apply the changes
     """
-    for mapi_name, tracked_instances in tracked_changes.items():
-        mapi = getattr(model, mapi_name)
-        for instance_id, tracked_attributes in tracked_instances.items():
-            instance = None
-            for attribute_name, value in tracked_attributes.items():
-                if value.initial != value.current:
-                    if not instance:
-                        instance = mapi.get(instance_id)
-                    setattr(instance, attribute_name, value.current)
-            if instance:
-                _validate_version_id(instance, mapi)
-                mapi.update(instance)
+    successfully_updated_instances = dict()
+    try:
+        for mapi_name, tracked_instances in tracked_changes.items():
+            successfully_updated_instances[mapi_name] = list()
+            mapi = getattr(model, mapi_name)
+            for instance_id, tracked_attributes in tracked_instances.items():
+                instance = None
+                for attribute_name, value in tracked_attributes.items():
+                    if value.initial != value.current:
+                        if not instance:
+                            instance = mapi.get(instance_id)
+                        setattr(instance, attribute_name, value.current)
+                if instance:
+                    _validate_version_id(instance, mapi)
+                    mapi.update(instance)
+                    
successfully_updated_instances[mapi_name].append(instance_id)
+    except BaseException as e:
+        for key, value in successfully_updated_instances.items():
+            if not value:
+                del successfully_updated_instances[key]
+        e.message = \
+            'Registering all the changes to the storage has failed. ' \
+            'The instances that were successfully updated : {0} .' \
+            'This was caused by 
{1}.'.format(json.dumps(successfully_updated_instances), str(e))
+        raise e
 
 
 def _validate_version_id(instance, mapi):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/storage/modeling/instance_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/instance_elements.py 
b/aria/storage/modeling/instance_elements.py
index 0666c8a..2b102f1 100644
--- a/aria/storage/modeling/instance_elements.py
+++ b/aria/storage/modeling/instance_elements.py
@@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin):
 
     # region many-to-one relationships
     @declared_attr
-    def service_instnce(cls):
+    def service_instance(cls):
         return cls.many_to_one_relationship('service_instance')
 
     # region many-to-many relationships
@@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin):
     * :code:`relationships`: List of :class:`Relationship`
     """
     __tablename__ = 'node'
+    version = Column(Integer, nullable=False)
+    __mapper_args__ = {'version_id_col': version}
 
     __private_fields__ = ['service_instance_fk',
                           'host_fk',
@@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin):
     runtime_properties = Column(aria_types.Dict)
     scaling_groups = Column(aria_types.List)
     state = Column(Text, nullable=False)
-    version = Column(Integer, default=1)
 
     @declared_attr
     def plugins(cls):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/storage_initializer.py
----------------------------------------------------------------------
diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py
index aea5ec8..175ec22 100644
--- a/aria/storage_initializer.py
+++ b/aria/storage_initializer.py
@@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model):
         service_instance=service_instance,
         name=node_model.id,
         runtime_properties={},
-        version=None,
         node_template=node,
         state='',
         scaling_groups=[]

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 047526a..301fc01 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment):
         name=DEPENDENCY_NODE_INSTANCE_NAME,
         service_instance=deployment,
         runtime_properties={'ip': '1.1.1.1'},
-        version=None,
         node_template=dependency_node,
         state='',
         scaling_groups=[]
@@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment):
         name=DEPENDENT_NODE_INSTANCE_NAME,
         service_instance=deployment,
         runtime_properties={},
-        version=None,
         node_template=dependent_node,
         state='',
         scaling_groups=[],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 7c54bc5..40deedd 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import time
 import json
 
@@ -33,67 +32,73 @@ from tests import mock
 from tests import storage
 
 
-def test_concurrent_modification_on_task_succeeded(context, executor, 
shared_file):
-    _test(context, executor, shared_file, _test_task_succeeded, 
expected_failure=True)
+def test_concurrent_modification_on_task_succeeded(context, executor, 
lock_files):
+    _test(context, executor, lock_files, _test_task_succeeded, 
expected_failure=True)
 
 
 @operation
-def _test_task_succeeded(ctx, shared_file, key, first_value, second_value):
-    _concurrent_update(shared_file, ctx.node_instance, key, first_value, 
second_value)
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
+    _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
 
 
-def test_concurrent_modification_on_task_failed(context, executor, 
shared_file):
-    _test(context, executor, shared_file, _test_task_failed, 
expected_failure=True)
+def test_concurrent_modification_on_task_failed(context, executor, lock_files):
+    _test(context, executor, lock_files, _test_task_failed, 
expected_failure=True)
 
 
 @operation
-def _test_task_failed(ctx, shared_file, key, first_value, second_value):
-    first = _concurrent_update(shared_file, ctx.node_instance, key, 
first_value, second_value)
+def _test_task_failed(ctx, lock_files, key, first_value, second_value):
+    first = _concurrent_update(lock_files, ctx.node, key, first_value, 
second_value)
     if not first:
         raise RuntimeError('MESSAGE')
 
 
-def test_concurrent_modification_on_update_and_refresh(context, executor, 
shared_file):
-    _test(context, executor, shared_file, _test_update_and_refresh, 
expected_failure=False)
+def test_concurrent_modification_on_update_and_refresh(context, executor, 
lock_files):
+    _test(context, executor, lock_files, _test_update_and_refresh, 
expected_failure=False)
 
 
 @operation
-def _test_update_and_refresh(ctx, shared_file, key, first_value, second_value):
-    node_instance = ctx.node_instance
-    first = _concurrent_update(shared_file, node_instance, key, first_value, 
second_value)
+def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
+    node = ctx.node
+    first = _concurrent_update(lock_files, node, key, first_value, 
second_value)
     if not first:
         try:
-            ctx.model.node_instance.update(node_instance)
+            ctx.model.node.update(node)
         except StorageError as e:
             assert 'Version conflict' in str(e)
-            ctx.model.node_instance.refresh(node_instance)
+            ctx.model.node.refresh(node)
         else:
             raise RuntimeError('Unexpected')
 
 
-def _test(context, executor, shared_file, func, expected_failure):
-    def _node_instance(ctx):
-        return 
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+def _test(context, executor, lock_files, func, expected_failure):
+    def _node(ctx):
+        return 
ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+
+    op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
 
-    shared_file.write(json.dumps({}))
     key = 'key'
     first_value = 'value1'
     second_value = 'value2'
     inputs = {
-        'shared_file': str(shared_file),
+        'lock_files': lock_files,
         'key': key,
         'first_value': first_value,
         'second_value': second_value
     }
 
+    node = _node(context)
+    node.interfaces = [mock.models.get_interface(
+        op_name,
+        operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, 
func.__name__))
+    )]
+    context.model.node.update(node)
+
     @workflow
-    def mock_workflow(ctx, graph):
-        op = 'test.op'
-        node_instance = _node_instance(ctx)
-        node_instance.node.operations[op] = {'operation': 
'{0}.{1}'.format(__name__, func.__name__)}
+    def mock_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.node_instance(instance=node_instance, 
name=op, inputs=inputs),
-            api.task.OperationTask.node_instance(instance=node_instance, 
name=op, inputs=inputs))
+            api.task.OperationTask.node(instance=node, name=op_name, 
inputs=inputs),
+            api.task.OperationTask.node(instance=node, name=op_name, 
inputs=inputs)
+        )
 
     signal = events.on_failure_task_signal
     with events_collector(signal) as collected:
@@ -102,7 +107,7 @@ def _test(context, executor, shared_file, func, 
expected_failure):
         except ExecutorException:
             pass
 
-    props = _node_instance(context).runtime_properties
+    props = _node(context).runtime_properties
     assert props[key] == first_value
 
     exceptions = [event['kwargs']['exception'] for event in 
collected.get(signal, [])]
@@ -124,51 +129,37 @@ def executor():
 
 @pytest.fixture
 def context(tmpdir):
-    result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)))
+    result = mock.context.simple(str(tmpdir))
     yield result
     storage.release_sqlite_storage(result.model)
 
 
 @pytest.fixture
-def shared_file(tmpdir):
-    return tmpdir.join('shared_file')
-
+def lock_files(tmpdir):
+    return str(tmpdir.join('first_lock_file')), 
str(tmpdir.join('second_lock_file'))
 
-def _concurrent_update(shared_file, node_instance, key, first_value, 
second_value):
-    def lock():
-        return fasteners.InterProcessLock(shared_file)
 
-    def get(key):
-        with open(shared_file) as f:
-            return json.load(f).get(key)
+def _concurrent_update(lock_files, node, key, first_value, second_value):
 
-    def set(key):
-        with open(shared_file) as f:
-            content = json.load(f)
-        content[key] = True
-        with open(shared_file, 'wb') as f:
-            json.dump(content, f)
+    locker1 = fasteners.InterProcessLock(lock_files[0])
+    locker2 = fasteners.InterProcessLock(lock_files[1])
 
-    def wait_for(key):
-        while True:
-            time.sleep(0.01)
-            with lock():
-                if get(key):
-                    break
-
-    with lock():
-        first = not get('first_in')
-        set('first_in' if first else 'second_in')
+    first = locker1.acquire(blocking=False)
 
     if first:
-        wait_for('second_in')
+        # Give chance for both processes to acquire locks
+        while locker2.acquire(blocking=False):
+            locker2.release()
+            time.sleep(0.01)
+    else:
+        locker2.acquire()
 
-    node_instance.runtime_properties[key] = first_value if first else 
second_value
+    node.runtime_properties[key] = first_value if first else second_value
 
     if first:
-        with lock():
-            set('first_out')
+        locker1.release()
     else:
-        wait_for('first_out')
+        with locker1:
+            locker2.release()
 
     return first

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/tests/storage/test_structures.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py
index 30f0064..666256e 100644
--- a/tests/storage/test_structures.py
+++ b/tests/storage/test_structures.py
@@ -125,7 +125,6 @@ def test_relationship_model_ordering(context):
         name='new_node_instance',
         runtime_properties={},
         service_instance=service_instance,
-        version=None,
         node_template=new_node_template,
         state='',
         scaling_groups=[]

Reply via email to