Repository: incubator-ariatosca Updated Branches: refs/heads/wf-wip [created] c52f949e9
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_model_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_model_storage.py b/tests/storage/test_model_storage.py new file mode 100644 index 0000000..cb5b4d8 --- /dev/null +++ b/tests/storage/test_model_storage.py @@ -0,0 +1,162 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.storage import ( + Storage, + ModelStorage, + models, +) +from aria.exceptions import StorageError +from aria.storage import structures +from aria.storage.structures import Model, Field, PointerField +from aria import application_model_storage + +from . import InMemoryModelDriver + + +def test_storage_base(): + driver = InMemoryModelDriver() + storage = Storage(driver) + + assert storage.driver == driver + + with pytest.raises(AttributeError): + storage.non_existent_attribute() + + +def test_model_storage(): + storage = ModelStorage(InMemoryModelDriver()) + storage.register(models.ProviderContext) + storage.setup() + + pc = models.ProviderContext(context={}, name='context_name', id='id1') + storage.provider_context.store(pc) + + assert storage.provider_context.get('id1') == pc + + assert [pc_from_storage for pc_from_storage in storage.provider_context.iter()] == [pc] + assert [pc_from_storage for pc_from_storage in storage.provider_context] == [pc] + + storage.provider_context.update('id1', context={'update_key': 0}) + assert storage.provider_context.get('id1').context == {'update_key': 0} + + storage.provider_context.delete('id1') + with pytest.raises(StorageError): + storage.provider_context.get('id1') + + +def test_storage_driver(): + storage = ModelStorage(InMemoryModelDriver()) + storage.register(models.ProviderContext) + storage.setup() + pc = models.ProviderContext(context={}, name='context_name', id='id2') + storage.driver.store(name='provider_context', entry=pc.fields_dict, entry_id=pc.id) + + assert storage.driver.get( + name='provider_context', + entry_id='id2', + model_cls=models.ProviderContext) == pc.fields_dict + + assert [i for i in storage.driver.iter(name='provider_context')] == [pc.fields_dict] + assert [i for i in storage.provider_context] == [pc] + + storage.provider_context.delete('id2') + + with pytest.raises(StorageError): + storage.provider_context.get('id2') + + +def test_application_storage_factory(): + driver = InMemoryModelDriver() + storage = application_model_storage(driver) + assert storage.node + assert storage.node_instance + assert storage.plugin + assert storage.blueprint + assert storage.snapshot + assert storage.deployment + assert storage.deployment_update + assert storage.deployment_update_step + assert storage.deployment_modification + assert storage.execution + assert storage.provider_context + + reused_storage = application_model_storage(driver) + assert reused_storage == storage + + +def test_storage_pointers(): + class PointedModel(Model): + id = Field() + + class PointingModel(Model): + id = Field() + pointing_field = PointerField(type=PointedModel) + + storage = ModelStorage(InMemoryModelDriver(), models=[PointingModel]) + storage.setup() + + assert storage.pointed_model + assert storage.pointing_model + + pointed_model = PointedModel(id='pointed_id') + + pointing_model = PointingModel(id='pointing_id', pointing_field=pointed_model) + storage.pointing_model.store(pointing_model) + + assert storage.pointed_model.get('pointed_id') == pointed_model + assert storage.pointing_model.get('pointing_id') == pointing_model + + storage.pointing_model.delete('pointing_id') + + with pytest.raises(StorageError): + assert storage.pointed_model.get('pointed_id') + assert storage.pointing_model.get('pointing_id') + + +def test_storage_iter_pointers(): + class PointedIterModel(models.Model): + id = structures.Field() + + class PointingIterModel(models.Model): + id = models.Field() + pointing_field = structures.IterPointerField(type=PointedIterModel) + + storage = ModelStorage(InMemoryModelDriver(), models=[PointingIterModel]) + storage.setup() + + assert storage.pointed_iter_model + assert storage.pointing_iter_model + + pointed_iter_model1 = PointedIterModel(id='pointed_id1') + pointed_iter_model2 = PointedIterModel(id='pointed_id2') + + pointing_iter_model = PointingIterModel( + id='pointing_id', + pointing_field=[pointed_iter_model1, pointed_iter_model2]) + storage.pointing_iter_model.store(pointing_iter_model) + + assert storage.pointed_iter_model.get('pointed_id1') == pointed_iter_model1 + assert storage.pointed_iter_model.get('pointed_id2') == pointed_iter_model2 + assert storage.pointing_iter_model.get('pointing_id') == pointing_iter_model + + storage.pointing_iter_model.delete('pointing_id') + + with pytest.raises(StorageError): + assert storage.pointed_iter_model.get('pointed_id1') + assert storage.pointed_iter_model.get('pointed_id2') + assert storage.pointing_iter_model.get('pointing_id') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py new file mode 100644 index 0000000..0325d0f --- /dev/null +++ b/tests/storage/test_models.py @@ -0,0 +1,289 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from datetime import datetime + +import pytest + +from aria.storage import Model, Field +from aria.exceptions import StorageError +from aria.storage.models import ( + DeploymentUpdateStep, + Relationship, + RelationshipInstance, + Node, + NodeInstance, + Blueprint, + Operation) + +# TODO: add tests per model + + +def test_base_model_without_fields(): + with pytest.raises(StorageError, message="Id field has to be in model fields"): + Model() + + +def test_base_model_members(): + _test_field = Field() + + class TestModel(Model): + test_field = _test_field + id = Field(default='test_id') + + assert _test_field is TestModel.test_field + + test_model = TestModel(test_field='test_field_value', id='test_id') + + assert repr(test_model) == "TestModel(fields=['id', 'test_field'])" + expected = {'test_field': 'test_field_value', 'id': 'test_id'} + assert json.loads(test_model.json) == expected + assert test_model.fields_dict == expected + + with pytest.raises(StorageError): + TestModel() + + with pytest.raises(StorageError): + TestModel(test_field='test_field_value', id='test_id', unsupported_field='value') + + class TestModel(Model): + test_field = Field() + id = Field() + + with pytest.raises(StorageError): + TestModel() + + +def test_blueprint_model(): + Blueprint( + plan={}, + id='id', + description='description', + created_at=datetime.now(), + updated_at=datetime.now(), + main_file_name='/path', + ) + with pytest.raises(TypeError): + Blueprint( + plan=None, + id='id', + description='description', + created_at=datetime.now(), + updated_at=datetime.now(), + main_file_name='/path', + ) + with pytest.raises(TypeError): + Blueprint( + plan={}, + id=999, + description='description', + created_at=datetime.now(), + updated_at=datetime.now(), + main_file_name='/path', + ) + with pytest.raises(TypeError): + Blueprint( + plan={}, + id='id', + description=999, + created_at=datetime.now(), + updated_at=datetime.now(), + main_file_name='/path', + ) + with pytest.raises(TypeError): + Blueprint( + plan={}, + id='id', + description='description', + created_at='error', + updated_at=datetime.now(), + main_file_name='/path', + ) + with pytest.raises(TypeError): + Blueprint( + plan={}, + id='id', + description='description', + created_at=datetime.now(), + updated_at=None, + main_file_name='/path', + ) + with pytest.raises(TypeError): + Blueprint( + plan={}, + id='id', + description='description', + created_at=datetime.now(), + updated_at=None, + main_file_name=88, + ) + Blueprint( + plan={}, + description='description', + created_at=datetime.now(), + updated_at=datetime.now(), + main_file_name='/path', + ) + + +def test_deployment_update_step_model(): + add_node = DeploymentUpdateStep( + id='add_step', + action='add', + entity_type='node', + entity_id='node_id') + + modify_node = DeploymentUpdateStep( + id='modify_step', + action='modify', + entity_type='node', + entity_id='node_id') + + remove_node = DeploymentUpdateStep( + id='remove_step', + action='remove', + entity_type='node', + entity_id='node_id') + + for step in (add_node, modify_node, remove_node): + assert hash((step.id, step.entity_id)) == hash(step) + + assert remove_node < modify_node < add_node + assert not (remove_node > modify_node > add_node) + + add_rel = DeploymentUpdateStep( + id='add_step', + action='add', + entity_type='relationship', + entity_id='relationship_id') + + modify_rel = DeploymentUpdateStep( + id='modify_step', + action='modify', + entity_type='relationship', + entity_id='relationship_id') + + remove_rel = DeploymentUpdateStep( + id='remove_step', + action='remove', + entity_type='relationship', + entity_id='relationship_id') + + assert remove_rel < remove_node < add_node < add_rel + assert not add_node < None + assert not modify_node < modify_rel and not modify_rel < modify_node + + +def _Relationship(id=''): + return Relationship( + id='rel{0}'.format(id), + target_id='target{0}'.format(id), + source_interfaces={}, + source_operations={}, + target_interfaces={}, + target_operations={}, + type='type{0}'.format(id), + type_hierarchy=[], + properties={}) + + +def test_relationships(): + relationships = [_Relationship(index) for index in xrange(3)] + + node = Node( + blueprint_id='blueprint_id', + type='type', + type_hierarchy=None, + number_of_instances=1, + planned_number_of_instances=1, + deploy_number_of_instances=1, + properties={}, + operations={}, + relationships=relationships, + min_number_of_instances=1, + max_number_of_instances=1) + + for index in xrange(3): + assert relationships[index] is \ + next(node.relationships_by_target('target{0}'.format(index))) + + relationship = _Relationship() + + node = Node( + blueprint_id='blueprint_id', + type='type', + type_hierarchy=None, + number_of_instances=1, + planned_number_of_instances=1, + deploy_number_of_instances=1, + properties={}, + operations={}, + relationships=[relationship, relationship, relationship], + min_number_of_instances=1, + max_number_of_instances=1) + + for node_relationship in node.relationships_by_target('target'): + assert relationship is node_relationship + + +def test_relationship_instance(): + relationship = _Relationship() + relationship_instances = [RelationshipInstance( + id='rel{0}'.format(index), + target_id='target_{0}'.format(index % 2), + target_name='', + relationship=relationship, + type='type{0}'.format(index)) for index in xrange(3)] + + node_instance = NodeInstance( + deployment_id='deployment_id', + runtime_properties={}, + version='1', + relationship_instances=relationship_instances, + node=Node( + blueprint_id='blueprint_id', + type='type', + type_hierarchy=None, + number_of_instances=1, + planned_number_of_instances=1, + deploy_number_of_instances=1, + properties={}, + operations={}, + relationships=[], + min_number_of_instances=1, + max_number_of_instances=1), + scaling_groups=() + ) + + from itertools import chain + + assert set(relationship_instances) == set(chain( + node_instance.relationships_by_target('target_0'), + node_instance.relationships_by_target('target_1'))) + + +def test_Operation(): + now = datetime.now() + operation = Operation( + execution_id='execution_id', + started_at=now, + ) + assert operation.finished is False + + for end_status in Operation.END_STATES: + operation.status = end_status + assert operation.finished is True + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_models_api.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models_api.py b/tests/storage/test_models_api.py new file mode 100644 index 0000000..80826c3 --- /dev/null +++ b/tests/storage/test_models_api.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.storage import _ModelApi, models +from aria.exceptions import StorageError + +from . import InMemoryModelDriver + + +def test_models_api_base(): + driver = InMemoryModelDriver() + driver.create('provider_context') + table = _ModelApi('provider_context', driver, models.ProviderContext) + assert repr(table) == ( + '{table.name}(driver={table.driver}, ' + 'model={table.model_cls})'.format(table=table)) + provider_context = models.ProviderContext(context={}, name='context_name', id='id') + + table.store(provider_context) + assert table.get('id') == provider_context + + assert [i for i in table.iter()] == [provider_context] + assert [i for i in table] == [provider_context] + + table.delete('id') + + with pytest.raises(StorageError): + table.get('id') + + +def test_iterable_model_api(): + driver = InMemoryModelDriver() + driver.create('deployment_update') + driver.create('deployment_update_step') + model_api = _ModelApi('deployment_update', driver, models.DeploymentUpdate) + deployment_update = models.DeploymentUpdate( + id='id', + deployment_id='deployment_id', + deployment_plan={}, + execution_id='execution_id', + steps=[models.DeploymentUpdateStep( + id='step_id', + action='add', + entity_type='node', + entity_id='node_id' + )] + ) + + model_api.store(deployment_update) + assert [i for i in model_api.iter()] == [deployment_update] + assert [i for i in model_api] == [deployment_update] + + model_api.delete('id') + + with pytest.raises(StorageError): + model_api.get('id') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_resource_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_resource_storage.py b/tests/storage/test_resource_storage.py new file mode 100644 index 0000000..c9ecd69 --- /dev/null +++ b/tests/storage/test_resource_storage.py @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile + +import pytest + +from aria.exceptions import StorageError +from aria.storage import ResourceStorage, FileSystemResourceDriver +from . import TestFileSystem + + +class TestResourceStorage(TestFileSystem): + def _create(self, storage): + storage.register('blueprint') + storage.setup() + + def _upload(self, storage, tmp_path, id): + with open(tmp_path, 'w') as f: + f.write('fake context') + + storage.blueprint.upload(entry_id=id, source=tmp_path) + + def _upload_dir(self, storage, tmp_dir, tmp_file_name, id): + file_source = os.path.join(tmp_dir, tmp_file_name) + with open(file_source, 'w') as f: + f.write('fake context') + + storage.blueprint.upload(entry_id=id, source=tmp_dir) + + def test_name(self): + driver = FileSystemResourceDriver(directory=self.path) + storage = ResourceStorage(driver, resources=['blueprint']) + assert repr(storage) == 'ResourceStorage(driver={driver})'.format( + driver=driver + ) + assert repr(storage.registered['blueprint']) == ( + 'ResourceApi(driver={driver}, resource={resource_name})'.format( + driver=driver, + resource_name='blueprint')) + + def test_create(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + assert os.path.exists(os.path.join(self.path, 'blueprint')) + + def test_upload_file(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + self._upload(storage, tmpfile_path, id='blueprint_id') + + storage_path = os.path.join( + self.path, + 'blueprint', + 'blueprint_id', + os.path.basename(tmpfile_path)) + assert os.path.exists(storage_path) + + with open(storage_path, 'rb') as f: + assert f.read() == 'fake context' + + def test_download_file(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + tmpfile_name = os.path.basename(tmpfile_path) + self._upload(storage, tmpfile_path, 'blueprint_id') + + temp_dir = tempfile.mkdtemp(dir=self.path) + storage.blueprint.download( + entry_id='blueprint_id', + destination=temp_dir, + path=tmpfile_name) + + with open(os.path.join(self.path, os.path.join(temp_dir, tmpfile_name))) as f: + assert f.read() == 'fake context' + + def test_data_file(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1] + self._upload(storage, tmpfile_path, 'blueprint_id') + + assert storage.blueprint.data(entry_id='blueprint_id') == 'fake context' + + def test_upload_dir(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='blueprint_id') + + destination = os.path.join( + self.path, + 'blueprint', + 'blueprint_id', + os.path.basename(second_level_tmp_dir), + os.path.basename(tmp_filename)) + + assert os.path.isfile(destination) + + def test_upload_path_in_dir(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='blueprint_id') + + second_update_file = tempfile.mkstemp(dir=self.path)[1] + with open(second_update_file, 'w') as f: + f.write('fake context2') + + storage.blueprint.upload( + entry_id='blueprint_id', + source=second_update_file, + path=os.path.basename(second_level_tmp_dir)) + + assert os.path.isfile(os.path.join( + self.path, + 'blueprint', + 'blueprint_id', + os.path.basename(second_level_tmp_dir), + os.path.basename(second_update_file))) + + def test_download_dir(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir) + tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1] + self._upload_dir(storage, tmp_dir, tmp_filename, id='blueprint_id') + + temp_destination_dir = tempfile.mkdtemp(dir=self.path) + storage.blueprint.download( + entry_id='blueprint_id', + destination=temp_destination_dir) + + destination_file_path = os.path.join( + temp_destination_dir, + os.path.basename(second_level_tmp_dir), + os.path.basename(tmp_filename)) + + assert os.path.isfile(destination_file_path) + + with open(destination_file_path) as f: + assert f.read() == 'fake context' + + def test_data_dir(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + + tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, dir=self.path) + tempfile.mkstemp(dir=tmp_dir) + tempfile.mkstemp(dir=tmp_dir) + + storage.blueprint.upload(entry_id='blueprint_id', source=tmp_dir) + + with pytest.raises(StorageError): + storage.blueprint.data(entry_id='blueprint_id') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/test_logger.py ---------------------------------------------------------------------- diff --git a/tests/test_logger.py b/tests/test_logger.py new file mode 100644 index 0000000..7891dd6 --- /dev/null +++ b/tests/test_logger.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import tempfile + +from aria.logger import (create_logger, + create_console_log_handler, + create_file_log_handler, + _default_file_formatter, + LoggerMixin, + _DefaultConsoleFormat) + + +def test_create_logger(): + + logger = create_logger() + assert logger.name == 'aria' + assert len(logger.handlers) == 0 + assert logger.level == logging.DEBUG + + custom_logger = logging.getLogger('custom_logger') + handlers = [logging.FileHandler, logging.StreamHandler] + logger = create_logger(logger=custom_logger, handlers=handlers, level=logging.INFO) + assert custom_logger.name == 'custom_logger' + assert logger.handlers == handlers + assert logger.level == logging.INFO + + +def test_create_console_log_handler(capsys): + + debug_test_string = 'debug_create_console_test_string' + info_test_string = 'info_create_console_test_string' + + # Default handler + handler = create_console_log_handler() + assert isinstance(handler, logging.StreamHandler) + assert isinstance(handler.formatter, _DefaultConsoleFormat) + assert handler.level == logging.DEBUG + + logger = create_logger(handlers=[handler]) + + logger.info(info_test_string) + logger.debug(debug_test_string) + _, err = capsys.readouterr() + assert err.count('DEBUG: {test_string}'.format(test_string=debug_test_string)) == 1 + assert err.count(info_test_string) == 1 + + # Custom handler + custom_handler = create_console_log_handler(level=logging.INFO, formatter=logging.Formatter()) + assert isinstance(custom_handler.formatter, logging.Formatter) + assert custom_handler.level == logging.INFO + + logger = create_logger(handlers=[custom_handler]) + + logger.info(info_test_string) + _, err = capsys.readouterr() + + assert err.count(info_test_string) == 1 + + +def test_create_file_log_handler(): + + test_string = 'create_file_log_test_string' + + with tempfile.NamedTemporaryFile() as temp_file: + handler = create_file_log_handler(file_path=temp_file.name) + assert handler.baseFilename == temp_file.name + assert handler.maxBytes == 5 * 1000 * 1024 + assert handler.backupCount == 10 + assert handler.delay is True + assert handler.level == logging.DEBUG + assert handler.formatter == _default_file_formatter + + logger = create_logger(handlers=[handler]) + logger.debug(test_string) + assert test_string in temp_file.read() + + with tempfile.NamedTemporaryFile() as temp_file: + handler = create_file_log_handler( + file_path=temp_file.name, + level=logging.INFO, + max_bytes=1000, + backup_count=2, + formatter=logging.Formatter() + ) + assert handler.baseFilename == temp_file.name + assert handler.level == logging.INFO + assert handler.maxBytes == 1000 + assert handler.backupCount == 2 + assert isinstance(handler.formatter, logging.Formatter) + + logger = create_logger(handlers=[handler]) + logger.info(test_string) + assert test_string in temp_file.read() + + +def test_loggermixin(capsys): + + test_string = 'loggermixing_test_string' + + create_logger(handlers=[create_console_log_handler()]) + + custom_class = type('CustomClass', (LoggerMixin,), {}).with_logger() + custom_class.logger.debug(test_string) + + _, err = capsys.readouterr() + assert test_string in err + + # TODO: figure out what up with pickle + # class_pickled = pickle.dumps(custom_class) + # class_unpickled = pickle.loads(class_pickled) + # + # assert vars(class_unpickled) == vars(custom_class) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tox.ini ---------------------------------------------------------------------- diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..bc7fb8f --- /dev/null +++ b/tox.ini @@ -0,0 +1,15 @@ +# content of: tox.ini , put in same dir as setup.py +[tox] +envlist=pylint_package,pylint_tests,py27 + +[testenv] +deps =-r{toxinidir}/tests/requirements.txt + +[testenv:py27] +commands=py.test tests --cov-report term-missing --cov aria + +[testenv:pylint_package] +commands=pylint aria --rcfile=.pylintrc --disable=cyclic-import,fixme + +[testenv:pylint_tests] +commands=pylint tests --rcfile=.pylintrc --disable=invalid-name,too-many-statements,too-few-public-methods,too-many-lines,too-many-public-methods,unused-argument,line-too-long,no-member,too-many-locals,duplicate-code,too-many-branches,too-many-arguments,fixme