Repository: incubator-ariatosca
Updated Branches:
  refs/heads/wf-wip [created] c52f949e9


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_model_storage.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_model_storage.py 
b/tests/storage/test_model_storage.py
new file mode 100644
index 0000000..cb5b4d8
--- /dev/null
+++ b/tests/storage/test_model_storage.py
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.storage import (
+    Storage,
+    ModelStorage,
+    models,
+)
+from aria.exceptions import StorageError
+from aria.storage import structures
+from aria.storage.structures import Model, Field, PointerField
+from aria import application_model_storage
+
+from . import InMemoryModelDriver
+
+
+def test_storage_base():
+    driver = InMemoryModelDriver()
+    storage = Storage(driver)
+
+    assert storage.driver == driver
+
+    with pytest.raises(AttributeError):
+        storage.non_existent_attribute()
+
+
+def test_model_storage():
+    storage = ModelStorage(InMemoryModelDriver())
+    storage.register(models.ProviderContext)
+    storage.setup()
+
+    pc = models.ProviderContext(context={}, name='context_name', id='id1')
+    storage.provider_context.store(pc)
+
+    assert storage.provider_context.get('id1') == pc
+
+    assert [pc_from_storage for pc_from_storage in 
storage.provider_context.iter()] == [pc]
+    assert [pc_from_storage for pc_from_storage in storage.provider_context] 
== [pc]
+
+    storage.provider_context.update('id1', context={'update_key': 0})
+    assert storage.provider_context.get('id1').context == {'update_key': 0}
+
+    storage.provider_context.delete('id1')
+    with pytest.raises(StorageError):
+        storage.provider_context.get('id1')
+
+
+def test_storage_driver():
+    storage = ModelStorage(InMemoryModelDriver())
+    storage.register(models.ProviderContext)
+    storage.setup()
+    pc = models.ProviderContext(context={}, name='context_name', id='id2')
+    storage.driver.store(name='provider_context', entry=pc.fields_dict, 
entry_id=pc.id)
+
+    assert storage.driver.get(
+        name='provider_context',
+        entry_id='id2',
+        model_cls=models.ProviderContext) == pc.fields_dict
+
+    assert [i for i in storage.driver.iter(name='provider_context')] == 
[pc.fields_dict]
+    assert [i for i in storage.provider_context] == [pc]
+
+    storage.provider_context.delete('id2')
+
+    with pytest.raises(StorageError):
+        storage.provider_context.get('id2')
+
+
+def test_application_storage_factory():
+    driver = InMemoryModelDriver()
+    storage = application_model_storage(driver)
+    assert storage.node
+    assert storage.node_instance
+    assert storage.plugin
+    assert storage.blueprint
+    assert storage.snapshot
+    assert storage.deployment
+    assert storage.deployment_update
+    assert storage.deployment_update_step
+    assert storage.deployment_modification
+    assert storage.execution
+    assert storage.provider_context
+
+    reused_storage = application_model_storage(driver)
+    assert reused_storage == storage
+
+
+def test_storage_pointers():
+    class PointedModel(Model):
+        id = Field()
+
+    class PointingModel(Model):
+        id = Field()
+        pointing_field = PointerField(type=PointedModel)
+
+    storage = ModelStorage(InMemoryModelDriver(), models=[PointingModel])
+    storage.setup()
+
+    assert storage.pointed_model
+    assert storage.pointing_model
+
+    pointed_model = PointedModel(id='pointed_id')
+
+    pointing_model = PointingModel(id='pointing_id', 
pointing_field=pointed_model)
+    storage.pointing_model.store(pointing_model)
+
+    assert storage.pointed_model.get('pointed_id') == pointed_model
+    assert storage.pointing_model.get('pointing_id') == pointing_model
+
+    storage.pointing_model.delete('pointing_id')
+
+    with pytest.raises(StorageError):
+        assert storage.pointed_model.get('pointed_id')
+        assert storage.pointing_model.get('pointing_id')
+
+
+def test_storage_iter_pointers():
+    class PointedIterModel(models.Model):
+        id = structures.Field()
+
+    class PointingIterModel(models.Model):
+        id = models.Field()
+        pointing_field = structures.IterPointerField(type=PointedIterModel)
+
+    storage = ModelStorage(InMemoryModelDriver(), models=[PointingIterModel])
+    storage.setup()
+
+    assert storage.pointed_iter_model
+    assert storage.pointing_iter_model
+
+    pointed_iter_model1 = PointedIterModel(id='pointed_id1')
+    pointed_iter_model2 = PointedIterModel(id='pointed_id2')
+
+    pointing_iter_model = PointingIterModel(
+        id='pointing_id',
+        pointing_field=[pointed_iter_model1, pointed_iter_model2])
+    storage.pointing_iter_model.store(pointing_iter_model)
+
+    assert storage.pointed_iter_model.get('pointed_id1') == pointed_iter_model1
+    assert storage.pointed_iter_model.get('pointed_id2') == pointed_iter_model2
+    assert storage.pointing_iter_model.get('pointing_id') == 
pointing_iter_model
+
+    storage.pointing_iter_model.delete('pointing_id')
+
+    with pytest.raises(StorageError):
+        assert storage.pointed_iter_model.get('pointed_id1')
+        assert storage.pointed_iter_model.get('pointed_id2')
+        assert storage.pointing_iter_model.get('pointing_id')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_models.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py
new file mode 100644
index 0000000..0325d0f
--- /dev/null
+++ b/tests/storage/test_models.py
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from datetime import datetime
+
+import pytest
+
+from aria.storage import Model, Field
+from aria.exceptions import StorageError
+from aria.storage.models import (
+    DeploymentUpdateStep,
+    Relationship,
+    RelationshipInstance,
+    Node,
+    NodeInstance,
+    Blueprint,
+    Operation)
+
+# TODO: add tests per model
+
+
+def test_base_model_without_fields():
+    with pytest.raises(StorageError, message="Id field has to be in model 
fields"):
+        Model()
+
+
+def test_base_model_members():
+    _test_field = Field()
+
+    class TestModel(Model):
+        test_field = _test_field
+        id = Field(default='test_id')
+
+    assert _test_field is TestModel.test_field
+
+    test_model = TestModel(test_field='test_field_value', id='test_id')
+
+    assert repr(test_model) == "TestModel(fields=['id', 'test_field'])"
+    expected = {'test_field': 'test_field_value', 'id': 'test_id'}
+    assert json.loads(test_model.json) == expected
+    assert test_model.fields_dict == expected
+
+    with pytest.raises(StorageError):
+        TestModel()
+
+    with pytest.raises(StorageError):
+        TestModel(test_field='test_field_value', id='test_id', 
unsupported_field='value')
+
+    class TestModel(Model):
+        test_field = Field()
+        id = Field()
+
+    with pytest.raises(StorageError):
+        TestModel()
+
+
+def test_blueprint_model():
+    Blueprint(
+        plan={},
+        id='id',
+        description='description',
+        created_at=datetime.now(),
+        updated_at=datetime.now(),
+        main_file_name='/path',
+    )
+    with pytest.raises(TypeError):
+        Blueprint(
+            plan=None,
+            id='id',
+            description='description',
+            created_at=datetime.now(),
+            updated_at=datetime.now(),
+            main_file_name='/path',
+        )
+    with pytest.raises(TypeError):
+        Blueprint(
+            plan={},
+            id=999,
+            description='description',
+            created_at=datetime.now(),
+            updated_at=datetime.now(),
+            main_file_name='/path',
+        )
+    with pytest.raises(TypeError):
+        Blueprint(
+            plan={},
+            id='id',
+            description=999,
+            created_at=datetime.now(),
+            updated_at=datetime.now(),
+            main_file_name='/path',
+        )
+    with pytest.raises(TypeError):
+        Blueprint(
+            plan={},
+            id='id',
+            description='description',
+            created_at='error',
+            updated_at=datetime.now(),
+            main_file_name='/path',
+        )
+    with pytest.raises(TypeError):
+        Blueprint(
+            plan={},
+            id='id',
+            description='description',
+            created_at=datetime.now(),
+            updated_at=None,
+            main_file_name='/path',
+        )
+    with pytest.raises(TypeError):
+        Blueprint(
+            plan={},
+            id='id',
+            description='description',
+            created_at=datetime.now(),
+            updated_at=None,
+            main_file_name=88,
+        )
+    Blueprint(
+        plan={},
+        description='description',
+        created_at=datetime.now(),
+        updated_at=datetime.now(),
+        main_file_name='/path',
+    )
+
+
+def test_deployment_update_step_model():
+    add_node = DeploymentUpdateStep(
+        id='add_step',
+        action='add',
+        entity_type='node',
+        entity_id='node_id')
+
+    modify_node = DeploymentUpdateStep(
+        id='modify_step',
+        action='modify',
+        entity_type='node',
+        entity_id='node_id')
+
+    remove_node = DeploymentUpdateStep(
+        id='remove_step',
+        action='remove',
+        entity_type='node',
+        entity_id='node_id')
+
+    for step in (add_node, modify_node, remove_node):
+        assert hash((step.id, step.entity_id)) == hash(step)
+
+    assert remove_node < modify_node < add_node
+    assert not (remove_node > modify_node > add_node)
+
+    add_rel = DeploymentUpdateStep(
+        id='add_step',
+        action='add',
+        entity_type='relationship',
+        entity_id='relationship_id')
+
+    modify_rel = DeploymentUpdateStep(
+        id='modify_step',
+        action='modify',
+        entity_type='relationship',
+        entity_id='relationship_id')
+
+    remove_rel = DeploymentUpdateStep(
+        id='remove_step',
+        action='remove',
+        entity_type='relationship',
+        entity_id='relationship_id')
+
+    assert remove_rel < remove_node < add_node < add_rel
+    assert not add_node < None
+    assert not modify_node < modify_rel and not modify_rel < modify_node
+
+
+def _Relationship(id=''):
+    return Relationship(
+        id='rel{0}'.format(id),
+        target_id='target{0}'.format(id),
+        source_interfaces={},
+        source_operations={},
+        target_interfaces={},
+        target_operations={},
+        type='type{0}'.format(id),
+        type_hierarchy=[],
+        properties={})
+
+
+def test_relationships():
+    relationships = [_Relationship(index) for index in xrange(3)]
+
+    node = Node(
+        blueprint_id='blueprint_id',
+        type='type',
+        type_hierarchy=None,
+        number_of_instances=1,
+        planned_number_of_instances=1,
+        deploy_number_of_instances=1,
+        properties={},
+        operations={},
+        relationships=relationships,
+        min_number_of_instances=1,
+        max_number_of_instances=1)
+
+    for index in xrange(3):
+        assert relationships[index] is \
+               next(node.relationships_by_target('target{0}'.format(index)))
+
+    relationship = _Relationship()
+
+    node = Node(
+        blueprint_id='blueprint_id',
+        type='type',
+        type_hierarchy=None,
+        number_of_instances=1,
+        planned_number_of_instances=1,
+        deploy_number_of_instances=1,
+        properties={},
+        operations={},
+        relationships=[relationship, relationship, relationship],
+        min_number_of_instances=1,
+        max_number_of_instances=1)
+
+    for node_relationship in node.relationships_by_target('target'):
+        assert relationship is node_relationship
+
+
+def test_relationship_instance():
+    relationship = _Relationship()
+    relationship_instances = [RelationshipInstance(
+        id='rel{0}'.format(index),
+        target_id='target_{0}'.format(index % 2),
+        target_name='',
+        relationship=relationship,
+        type='type{0}'.format(index)) for index in xrange(3)]
+
+    node_instance = NodeInstance(
+        deployment_id='deployment_id',
+        runtime_properties={},
+        version='1',
+        relationship_instances=relationship_instances,
+        node=Node(
+            blueprint_id='blueprint_id',
+            type='type',
+            type_hierarchy=None,
+            number_of_instances=1,
+            planned_number_of_instances=1,
+            deploy_number_of_instances=1,
+            properties={},
+            operations={},
+            relationships=[],
+            min_number_of_instances=1,
+            max_number_of_instances=1),
+        scaling_groups=()
+    )
+
+    from itertools import chain
+
+    assert set(relationship_instances) == set(chain(
+        node_instance.relationships_by_target('target_0'),
+        node_instance.relationships_by_target('target_1')))
+
+
+def test_Operation():
+    now = datetime.now()
+    operation = Operation(
+        execution_id='execution_id',
+        started_at=now,
+    )
+    assert operation.finished is False
+
+    for end_status in Operation.END_STATES:
+        operation.status = end_status
+        assert operation.finished is True
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_models_api.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models_api.py b/tests/storage/test_models_api.py
new file mode 100644
index 0000000..80826c3
--- /dev/null
+++ b/tests/storage/test_models_api.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.storage import _ModelApi, models
+from aria.exceptions import StorageError
+
+from . import InMemoryModelDriver
+
+
+def test_models_api_base():
+    driver = InMemoryModelDriver()
+    driver.create('provider_context')
+    table = _ModelApi('provider_context', driver, models.ProviderContext)
+    assert repr(table) == (
+        '{table.name}(driver={table.driver}, '
+        'model={table.model_cls})'.format(table=table))
+    provider_context = models.ProviderContext(context={}, name='context_name', 
id='id')
+
+    table.store(provider_context)
+    assert table.get('id') == provider_context
+
+    assert [i for i in table.iter()] == [provider_context]
+    assert [i for i in table] == [provider_context]
+
+    table.delete('id')
+
+    with pytest.raises(StorageError):
+        table.get('id')
+
+
+def test_iterable_model_api():
+    driver = InMemoryModelDriver()
+    driver.create('deployment_update')
+    driver.create('deployment_update_step')
+    model_api = _ModelApi('deployment_update', driver, models.DeploymentUpdate)
+    deployment_update = models.DeploymentUpdate(
+        id='id',
+        deployment_id='deployment_id',
+        deployment_plan={},
+        execution_id='execution_id',
+        steps=[models.DeploymentUpdateStep(
+            id='step_id',
+            action='add',
+            entity_type='node',
+            entity_id='node_id'
+        )]
+    )
+
+    model_api.store(deployment_update)
+    assert [i for i in model_api.iter()] == [deployment_update]
+    assert [i for i in model_api] == [deployment_update]
+
+    model_api.delete('id')
+
+    with pytest.raises(StorageError):
+        model_api.get('id')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/storage/test_resource_storage.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_resource_storage.py 
b/tests/storage/test_resource_storage.py
new file mode 100644
index 0000000..c9ecd69
--- /dev/null
+++ b/tests/storage/test_resource_storage.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import tempfile
+
+import pytest
+
+from aria.exceptions import StorageError
+from aria.storage import ResourceStorage, FileSystemResourceDriver
+from . import TestFileSystem
+
+
+class TestResourceStorage(TestFileSystem):
+    def _create(self, storage):
+        storage.register('blueprint')
+        storage.setup()
+
+    def _upload(self, storage, tmp_path, id):
+        with open(tmp_path, 'w') as f:
+            f.write('fake context')
+
+        storage.blueprint.upload(entry_id=id, source=tmp_path)
+
+    def _upload_dir(self, storage, tmp_dir, tmp_file_name, id):
+        file_source = os.path.join(tmp_dir, tmp_file_name)
+        with open(file_source, 'w') as f:
+            f.write('fake context')
+
+        storage.blueprint.upload(entry_id=id, source=tmp_dir)
+
+    def test_name(self):
+        driver = FileSystemResourceDriver(directory=self.path)
+        storage = ResourceStorage(driver, resources=['blueprint'])
+        assert repr(storage) == 'ResourceStorage(driver={driver})'.format(
+            driver=driver
+        )
+        assert repr(storage.registered['blueprint']) == (
+            'ResourceApi(driver={driver}, resource={resource_name})'.format(
+                driver=driver,
+                resource_name='blueprint'))
+
+    def test_create(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        assert os.path.exists(os.path.join(self.path, 'blueprint'))
+
+    def test_upload_file(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, 
dir=self.path)[1]
+        self._upload(storage, tmpfile_path, id='blueprint_id')
+
+        storage_path = os.path.join(
+            self.path,
+            'blueprint',
+            'blueprint_id',
+            os.path.basename(tmpfile_path))
+        assert os.path.exists(storage_path)
+
+        with open(storage_path, 'rb') as f:
+            assert f.read() == 'fake context'
+
+    def test_download_file(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, 
dir=self.path)[1]
+        tmpfile_name = os.path.basename(tmpfile_path)
+        self._upload(storage, tmpfile_path, 'blueprint_id')
+
+        temp_dir = tempfile.mkdtemp(dir=self.path)
+        storage.blueprint.download(
+            entry_id='blueprint_id',
+            destination=temp_dir,
+            path=tmpfile_name)
+
+        with open(os.path.join(self.path, os.path.join(temp_dir, 
tmpfile_name))) as f:
+            assert f.read() == 'fake context'
+
+    def test_data_file(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, 
dir=self.path)[1]
+        self._upload(storage, tmpfile_path, 'blueprint_id')
+
+        assert storage.blueprint.data(entry_id='blueprint_id') == 'fake 
context'
+
+    def test_upload_dir(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, 
dir=self.path)
+        second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+        tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+        self._upload_dir(storage, tmp_dir, tmp_filename, id='blueprint_id')
+
+        destination = os.path.join(
+            self.path,
+            'blueprint',
+            'blueprint_id',
+            os.path.basename(second_level_tmp_dir),
+            os.path.basename(tmp_filename))
+
+        assert os.path.isfile(destination)
+
+    def test_upload_path_in_dir(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, 
dir=self.path)
+        second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+        tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+        self._upload_dir(storage, tmp_dir, tmp_filename, id='blueprint_id')
+
+        second_update_file = tempfile.mkstemp(dir=self.path)[1]
+        with open(second_update_file, 'w') as f:
+            f.write('fake context2')
+
+        storage.blueprint.upload(
+            entry_id='blueprint_id',
+            source=second_update_file,
+            path=os.path.basename(second_level_tmp_dir))
+
+        assert os.path.isfile(os.path.join(
+            self.path,
+            'blueprint',
+            'blueprint_id',
+            os.path.basename(second_level_tmp_dir),
+            os.path.basename(second_update_file)))
+
+    def test_download_dir(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+        tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, 
dir=self.path)
+        second_level_tmp_dir = tempfile.mkdtemp(dir=tmp_dir)
+        tmp_filename = tempfile.mkstemp(dir=second_level_tmp_dir)[1]
+        self._upload_dir(storage, tmp_dir, tmp_filename, id='blueprint_id')
+
+        temp_destination_dir = tempfile.mkdtemp(dir=self.path)
+        storage.blueprint.download(
+            entry_id='blueprint_id',
+            destination=temp_destination_dir)
+
+        destination_file_path = os.path.join(
+            temp_destination_dir,
+            os.path.basename(second_level_tmp_dir),
+            os.path.basename(tmp_filename))
+
+        assert os.path.isfile(destination_file_path)
+
+        with open(destination_file_path) as f:
+            assert f.read() == 'fake context'
+
+    def test_data_dir(self):
+        storage = 
ResourceStorage(FileSystemResourceDriver(directory=self.path))
+        self._create(storage)
+
+        tmp_dir = tempfile.mkdtemp(suffix=self.__class__.__name__, 
dir=self.path)
+        tempfile.mkstemp(dir=tmp_dir)
+        tempfile.mkstemp(dir=tmp_dir)
+
+        storage.blueprint.upload(entry_id='blueprint_id', source=tmp_dir)
+
+        with pytest.raises(StorageError):
+            storage.blueprint.data(entry_id='blueprint_id')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
new file mode 100644
index 0000000..7891dd6
--- /dev/null
+++ b/tests/test_logger.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import tempfile
+
+from aria.logger import (create_logger,
+                         create_console_log_handler,
+                         create_file_log_handler,
+                         _default_file_formatter,
+                         LoggerMixin,
+                         _DefaultConsoleFormat)
+
+
+def test_create_logger():
+
+    logger = create_logger()
+    assert logger.name == 'aria'
+    assert len(logger.handlers) == 0
+    assert logger.level == logging.DEBUG
+
+    custom_logger = logging.getLogger('custom_logger')
+    handlers = [logging.FileHandler, logging.StreamHandler]
+    logger = create_logger(logger=custom_logger, handlers=handlers, 
level=logging.INFO)
+    assert custom_logger.name == 'custom_logger'
+    assert logger.handlers == handlers
+    assert logger.level == logging.INFO
+
+
+def test_create_console_log_handler(capsys):
+
+    debug_test_string = 'debug_create_console_test_string'
+    info_test_string = 'info_create_console_test_string'
+
+    # Default handler
+    handler = create_console_log_handler()
+    assert isinstance(handler, logging.StreamHandler)
+    assert isinstance(handler.formatter, _DefaultConsoleFormat)
+    assert handler.level == logging.DEBUG
+
+    logger = create_logger(handlers=[handler])
+
+    logger.info(info_test_string)
+    logger.debug(debug_test_string)
+    _, err = capsys.readouterr()
+    assert err.count('DEBUG: 
{test_string}'.format(test_string=debug_test_string)) == 1
+    assert err.count(info_test_string) == 1
+
+    # Custom handler
+    custom_handler = create_console_log_handler(level=logging.INFO, 
formatter=logging.Formatter())
+    assert isinstance(custom_handler.formatter, logging.Formatter)
+    assert custom_handler.level == logging.INFO
+
+    logger = create_logger(handlers=[custom_handler])
+
+    logger.info(info_test_string)
+    _, err = capsys.readouterr()
+
+    assert err.count(info_test_string) == 1
+
+
+def test_create_file_log_handler():
+
+    test_string = 'create_file_log_test_string'
+
+    with tempfile.NamedTemporaryFile() as temp_file:
+        handler = create_file_log_handler(file_path=temp_file.name)
+        assert handler.baseFilename == temp_file.name
+        assert handler.maxBytes == 5 * 1000 * 1024
+        assert handler.backupCount == 10
+        assert handler.delay is True
+        assert handler.level == logging.DEBUG
+        assert handler.formatter == _default_file_formatter
+
+        logger = create_logger(handlers=[handler])
+        logger.debug(test_string)
+        assert test_string in temp_file.read()
+
+    with tempfile.NamedTemporaryFile() as temp_file:
+        handler = create_file_log_handler(
+            file_path=temp_file.name,
+            level=logging.INFO,
+            max_bytes=1000,
+            backup_count=2,
+            formatter=logging.Formatter()
+        )
+        assert handler.baseFilename == temp_file.name
+        assert handler.level == logging.INFO
+        assert handler.maxBytes == 1000
+        assert handler.backupCount == 2
+        assert isinstance(handler.formatter, logging.Formatter)
+
+        logger = create_logger(handlers=[handler])
+        logger.info(test_string)
+        assert test_string in temp_file.read()
+
+
+def test_loggermixin(capsys):
+
+    test_string = 'loggermixing_test_string'
+
+    create_logger(handlers=[create_console_log_handler()])
+
+    custom_class = type('CustomClass', (LoggerMixin,), {}).with_logger()
+    custom_class.logger.debug(test_string)
+
+    _, err = capsys.readouterr()
+    assert test_string in err
+
+    # TODO: figure out what up with pickle
+    # class_pickled = pickle.dumps(custom_class)
+    # class_unpickled = pickle.loads(class_pickled)
+    #
+    # assert vars(class_unpickled) == vars(custom_class)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c52f949e/tox.ini
----------------------------------------------------------------------
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..bc7fb8f
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,15 @@
+# content of: tox.ini , put in same dir as setup.py
+[tox]
+envlist=pylint_package,pylint_tests,py27
+
+[testenv]
+deps =-r{toxinidir}/tests/requirements.txt
+
+[testenv:py27]
+commands=py.test tests --cov-report term-missing --cov aria
+
+[testenv:pylint_package]
+commands=pylint aria --rcfile=.pylintrc --disable=cyclic-import,fixme
+
+[testenv:pylint_tests]
+commands=pylint tests --rcfile=.pylintrc 
--disable=invalid-name,too-many-statements,too-few-public-methods,too-many-lines,too-many-public-methods,unused-argument,line-too-long,no-member,too-many-locals,duplicate-code,too-many-branches,too-many-arguments,fixme

Reply via email to