Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-79-concurrent-storage-modifications 5af1a8f23 -> ab138d4d7 (forced update)
tests fix and exception msg Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ab138d4d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ab138d4d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ab138d4d Branch: refs/heads/ARIA-79-concurrent-storage-modifications Commit: ab138d4d7d82fec44288f96bc8dca9f2c7248b80 Parents: 4a72e11 Author: mxmrlv <mxm...@gmail.com> Authored: Thu Feb 16 13:06:35 2017 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Thu Feb 16 16:36:47 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 3 +- aria/storage/instrumentation.py | 40 ++++--- aria/storage/modeling/instance_elements.py | 5 +- aria/storage_initializer.py | 1 - tests/mock/models.py | 2 - ...process_executor_concurrent_modifications.py | 109 +++++++++---------- tests/storage/test_structures.py | 1 - 7 files changed, 82 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index a23e3da..84f5f58 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -227,6 +227,8 @@ class ProcessExecutor(base.BaseExecutor): try: self._apply_tracked_changes(task, request) except BaseException as e: + e.message = \ + '{0} Remote task execution failed due: {1}'.format(str(e), request['exception']) self._task_failed(task, exception=e) else: self._task_failed(task, exception=request['exception']) @@ -364,7 +366,6 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details storage_type.remove_mutable_association_listener() - with instrumentation.track_changes() as instrument: try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 41818b6..b3ca24a 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -14,6 +14,7 @@ # limitations under the License. import copy +import json import sqlalchemy import sqlalchemy.event @@ -22,7 +23,7 @@ from . import exceptions from .modeling import model as _model -_VERSION_ID_COL = 'version_id' +_VERSION_ID_COL = 'version' _STUB = object() _INSTRUMENTED = { _model.Node.runtime_properties: dict @@ -162,18 +163,31 @@ def apply_tracked_changes(tracked_changes, model): returned by calling ``track_changes()`` :param model: The model storage used to actually apply the changes """ - for mapi_name, tracked_instances in tracked_changes.items(): - mapi = getattr(model, mapi_name) - for instance_id, tracked_attributes in tracked_instances.items(): - instance = None - for attribute_name, value in tracked_attributes.items(): - if value.initial != value.current: - if not instance: - instance = mapi.get(instance_id) - setattr(instance, attribute_name, value.current) - if instance: - _validate_version_id(instance, mapi) - mapi.update(instance) + successfully_updated_instances = dict() + try: + for mapi_name, tracked_instances in tracked_changes.items(): + successfully_updated_instances[mapi_name] = list() + mapi = getattr(model, mapi_name) + for instance_id, tracked_attributes in tracked_instances.items(): + instance = None + for attribute_name, value in tracked_attributes.items(): + if value.initial != value.current: + if not instance: + instance = mapi.get(instance_id) + setattr(instance, attribute_name, value.current) + if instance: + _validate_version_id(instance, mapi) + mapi.update(instance) + successfully_updated_instances[mapi_name].append(instance_id) + except BaseException as e: + for key, value in successfully_updated_instances.items(): + if not value: + del successfully_updated_instances[key] + e.message = \ + 'Registering all the changes to the storage has failed. ' \ + 'The instances that were successfully updated : {0} .' \ + 'This was caused by {1}.'.format(json.dumps(successfully_updated_instances), str(e)) + raise e def _validate_version_id(instance, mapi): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/storage/modeling/instance_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/instance_elements.py b/aria/storage/modeling/instance_elements.py index 0666c8a..2b102f1 100644 --- a/aria/storage/modeling/instance_elements.py +++ b/aria/storage/modeling/instance_elements.py @@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin): # region many-to-one relationships @declared_attr - def service_instnce(cls): + def service_instance(cls): return cls.many_to_one_relationship('service_instance') # region many-to-many relationships @@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin): * :code:`relationships`: List of :class:`Relationship` """ __tablename__ = 'node' + version = Column(Integer, nullable=False) + __mapper_args__ = {'version_id_col': version} __private_fields__ = ['service_instance_fk', 'host_fk', @@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin): runtime_properties = Column(aria_types.Dict) scaling_groups = Column(aria_types.List) state = Column(Text, nullable=False) - version = Column(Integer, default=1) @declared_attr def plugins(cls): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/aria/storage_initializer.py ---------------------------------------------------------------------- diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py index aea5ec8..175ec22 100644 --- a/aria/storage_initializer.py +++ b/aria/storage_initializer.py @@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model): service_instance=service_instance, name=node_model.id, runtime_properties={}, - version=None, node_template=node, state='', scaling_groups=[] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 047526a..301fc01 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment): name=DEPENDENCY_NODE_INSTANCE_NAME, service_instance=deployment, runtime_properties={'ip': '1.1.1.1'}, - version=None, node_template=dependency_node, state='', scaling_groups=[] @@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment): name=DEPENDENT_NODE_INSTANCE_NAME, service_instance=deployment, runtime_properties={}, - version=None, node_template=dependent_node, state='', scaling_groups=[], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 7c54bc5..40deedd 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import time import json @@ -33,67 +32,73 @@ from tests import mock from tests import storage -def test_concurrent_modification_on_task_succeeded(context, executor, shared_file): - _test(context, executor, shared_file, _test_task_succeeded, expected_failure=True) +def test_concurrent_modification_on_task_succeeded(context, executor, lock_files): + _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True) @operation -def _test_task_succeeded(ctx, shared_file, key, first_value, second_value): - _concurrent_update(shared_file, ctx.node_instance, key, first_value, second_value) +def _test_task_succeeded(ctx, lock_files, key, first_value, second_value): + _concurrent_update(lock_files, ctx.node, key, first_value, second_value) -def test_concurrent_modification_on_task_failed(context, executor, shared_file): - _test(context, executor, shared_file, _test_task_failed, expected_failure=True) +def test_concurrent_modification_on_task_failed(context, executor, lock_files): + _test(context, executor, lock_files, _test_task_failed, expected_failure=True) @operation -def _test_task_failed(ctx, shared_file, key, first_value, second_value): - first = _concurrent_update(shared_file, ctx.node_instance, key, first_value, second_value) +def _test_task_failed(ctx, lock_files, key, first_value, second_value): + first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value) if not first: raise RuntimeError('MESSAGE') -def test_concurrent_modification_on_update_and_refresh(context, executor, shared_file): - _test(context, executor, shared_file, _test_update_and_refresh, expected_failure=False) +def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files): + _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False) @operation -def _test_update_and_refresh(ctx, shared_file, key, first_value, second_value): - node_instance = ctx.node_instance - first = _concurrent_update(shared_file, node_instance, key, first_value, second_value) +def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value): + node = ctx.node + first = _concurrent_update(lock_files, node, key, first_value, second_value) if not first: try: - ctx.model.node_instance.update(node_instance) + ctx.model.node.update(node) except StorageError as e: assert 'Version conflict' in str(e) - ctx.model.node_instance.refresh(node_instance) + ctx.model.node.refresh(node) else: raise RuntimeError('Unexpected') -def _test(context, executor, shared_file, func, expected_failure): - def _node_instance(ctx): - return ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) +def _test(context, executor, lock_files, func, expected_failure): + def _node(ctx): + return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + + op_name = mock.operations.NODE_OPERATIONS_INSTALL[0] - shared_file.write(json.dumps({})) key = 'key' first_value = 'value1' second_value = 'value2' inputs = { - 'shared_file': str(shared_file), + 'lock_files': lock_files, 'key': key, 'first_value': first_value, 'second_value': second_value } + node = _node(context) + node.interfaces = [mock.models.get_interface( + op_name, + operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, func.__name__)) + )] + context.model.node.update(node) + @workflow - def mock_workflow(ctx, graph): - op = 'test.op' - node_instance = _node_instance(ctx) - node_instance.node.operations[op] = {'operation': '{0}.{1}'.format(__name__, func.__name__)} + def mock_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs), - api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs)) + api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs), + api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs) + ) signal = events.on_failure_task_signal with events_collector(signal) as collected: @@ -102,7 +107,7 @@ def _test(context, executor, shared_file, func, expected_failure): except ExecutorException: pass - props = _node_instance(context).runtime_properties + props = _node(context).runtime_properties assert props[key] == first_value exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] @@ -124,51 +129,37 @@ def executor(): @pytest.fixture def context(tmpdir): - result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir))) + result = mock.context.simple(str(tmpdir)) yield result storage.release_sqlite_storage(result.model) @pytest.fixture -def shared_file(tmpdir): - return tmpdir.join('shared_file') - +def lock_files(tmpdir): + return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file')) -def _concurrent_update(shared_file, node_instance, key, first_value, second_value): - def lock(): - return fasteners.InterProcessLock(shared_file) - def get(key): - with open(shared_file) as f: - return json.load(f).get(key) +def _concurrent_update(lock_files, node, key, first_value, second_value): - def set(key): - with open(shared_file) as f: - content = json.load(f) - content[key] = True - with open(shared_file, 'wb') as f: - json.dump(content, f) + locker1 = fasteners.InterProcessLock(lock_files[0]) + locker2 = fasteners.InterProcessLock(lock_files[1]) - def wait_for(key): - while True: - time.sleep(0.01) - with lock(): - if get(key): - break - - with lock(): - first = not get('first_in') - set('first_in' if first else 'second_in') + first = locker1.acquire(blocking=False) if first: - wait_for('second_in') + # Give chance for both processes to acquire locks + while locker2.acquire(blocking=False): + locker2.release() + time.sleep(0.01) + else: + locker2.acquire() - node_instance.runtime_properties[key] = first_value if first else second_value + node.runtime_properties[key] = first_value if first else second_value if first: - with lock(): - set('first_out') + locker1.release() else: - wait_for('first_out') + with locker1: + locker2.release() return first http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ab138d4d/tests/storage/test_structures.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py index 30f0064..666256e 100644 --- a/tests/storage/test_structures.py +++ b/tests/storage/test_structures.py @@ -125,7 +125,6 @@ def test_relationship_model_ordering(context): name='new_node_instance', runtime_properties={}, service_instance=service_instance, - version=None, node_template=new_node_template, state='', scaling_groups=[]