added tests for all aspects of Vlans and NAT Rules
Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/b19a1653 Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/b19a1653 Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/b19a1653 Branch: refs/heads/trunk Commit: b19a16539c34d22c3d7582c2a32f6a613b78bb16 Parents: 1bffad0 Author: mitch <[email protected]> Authored: Mon Sep 17 09:45:13 2018 -0400 Committer: mitch <[email protected]> Committed: Mon Sep 17 09:45:13 2018 -0400 ---------------------------------------------------------------------- libcloud/common/nttcis.py | 129 ++++++++++++++++++++++++- libcloud/compute/drivers/nttcis.py | 135 +++++++++++++++++++++++---- libcloud/loadbalancer/drivers/nttcis.py | 32 ++++--- libcloud/utils/xml.py | 14 +++ tests/conftest.py | 7 +- tests/lib_create_test.py | 83 +++++++++++++++- tests/lib_edit_test.py | 69 ++++++++++++-- tests/lib_list_test.py | 49 +++++++++- 8 files changed, 471 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/libcloud/common/nttcis.py ---------------------------------------------------------------------- diff --git a/libcloud/common/nttcis.py b/libcloud/common/nttcis.py index 4b5272a..75bd875 100644 --- a/libcloud/common/nttcis.py +++ b/libcloud/common/nttcis.py @@ -26,6 +26,8 @@ from libcloud.compute.base import Node from libcloud.utils.py3 import basestring from libcloud.utils.xml import findtext from libcloud.compute.types import LibcloudError, InvalidCredsError +from collections import abc +from keyword import iskeyword # Roadmap / TODO: # @@ -489,6 +491,7 @@ class NttCisConnection(ConnectionUserAndKey): params=params, data=data, method=method, headers=headers) + def paginated_request_with_orgId_api_2(self, action, params=None, data='', headers=None, method='GET', page_size=250): @@ -738,7 +741,7 @@ class NttCisPublicIpBlock(object): self.status = status def __repr__(self): - return (('<DimensionDataNetworkDomain: id=%s, base_ip=%s, ' + return (('<NttCisNetworkDomain: id=%s, base_ip=%s, ' 'size=%s, location=%s, status=%s>') % (self.id, self.base_ip, self.size, self.location, self.status)) @@ -899,6 +902,24 @@ class NttCisSnapshot(object): % (self.id, self.start_time, self.end_time, self.type, self.expiry_time, self.state)) +class NttCisReservedIpAddress(object): + """ + NTTCIS Rerverse IPv4 address + """ + + def __init__(self, datacenter_id, exclusive, vlan_id, ip, description=None): + self.datacenter_id = datacenter_id + self.exclusive = exclusive + self.vlan_id = vlan_id + self.ip = ip + self.description = description + + def __repr__(self): + return (('<NttCisReservedIpAddress ' + 'datacenterId=%s, exclusiven=%s, vlanId=%s, ipAddress=%s, description=-%s') % + (self.datacenter_id, self.exclusive, self.vlan_id, self.ip, self.description)) + + class NttCisFirewallRule(object): """ NTTCIS Firewall Rule for a network domain @@ -1908,3 +1929,109 @@ class NttCisNic(object): return ('<NttCisNic: private_ip_v4=%s, vlan=%s,' 'network_adapter_name=%s>' % (self.private_ip_v4, self.vlan, self.network_adapter_name)) + + +##### Testing new concept below this line + +class XmlListConfig(list): + + def __init__(self, elem_list): + for element in elem_list: + if element is not None: + # treat like dict + #print(element.attrib, len(element)) + if len(element) >= 0 or element[0].tag != element[1].tag: + self.append(XmlDictConfig(element)) + # treat like list + elif element[0].tag == element[1].tag: + if 'property' in element.tag: + self.append({element.attrib.get('name'): element.attrib.get('value')}) + else: + self.append(element.attrib) + elif element.text: + text = element.text.strip() + if text: + self.append(text) + + +class XmlDictConfig(dict): + + def __init__(self, parent_element): + if parent_element.items(): + if 'property' in parent_element.tag: + self.update({parent_element.attrib.get('name'): parent_element.attrib.get('value')}) + else: + self.update(dict(parent_element.items())) + + for element in parent_element: + if len(element) > 0: + # treat like dict - we assume that if the first two tags + # in a series are different, then they are all different. + if len(element) == 1 or element[0].tag != element[1].tag: + elem_dict = XmlDictConfig(element) + + # treat like list - we assume that if the first two tags + # in a series are the same, then the rest are the same. + else: + # here, we put the list in dictionary; the key is the + # tag name the list elements all share in common, and + # the value is the list itself + elem_dict = {element[0].tag.split('}')[1]: XmlListConfig(element)} + + # if the tag has attributes, add those to the dict + if element.items(): + elem_dict.update(dict(element.items())) + self.update({element.tag.split('}')[1]: elem_dict}) + # this assumes that if you've got an attribute in a tag, + # you won't be having any text. This may or may not be a + # good idea -- time will tell. It works for the way we are + # currently doing XML configuration files... + elif element.items(): + # It is possible to have duplicate element tags. If so, convert to a dict of lists + if element.tag in self: + tmp_list = list() + tmp_dict = dict() + if isinstance(self[element.tag], list): + tmp_list.append(element.tag) + else: + for k, v in self[element.tag].items(): + if isinstance(k, XmlListConfig): + tmp_list.append(k) + else: + tmp_dict.update({k: v}) + tmp_list.append(tmp_dict) + tmp_list.append(dict(element.items())) + self[element.tag] = tmp_list + else: + self.update({element.tag.split('}')[1]: dict(element.items())}) + # finally, if there are no child tags and no attributes, extract + # the text + else: + self.update({element.tag.split('}')[1]: element.text}) + + +class Generic: + def __new__(cls, arg): + if isinstance(arg, abc.Mapping): + return super().__new__(cls) + elif isinstance(arg, abc.MutableSequence): + return [cls(item) for item in arg] + else: + return arg + + def __init__(self, mapping): + self.__data = {} + for key, value in mapping.items(): + if iskeyword(key): + key += '_' + self.__data[key] = value + + def __getattr__(self, name): + if hasattr(self.__data, name): + return getattr(self.__data, name) + else: + return Generic(self.__data[name]) + + def __repr__(self): + values = ','.join("{}={!r}".format(k, v) for k,v in self.__data.items()) + return values \ No newline at end of file http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/libcloud/compute/drivers/nttcis.py ---------------------------------------------------------------------- diff --git a/libcloud/compute/drivers/nttcis.py b/libcloud/compute/drivers/nttcis.py index 2e88b2c..2f20805 100644 --- a/libcloud/compute/drivers/nttcis.py +++ b/libcloud/compute/drivers/nttcis.py @@ -40,6 +40,7 @@ from libcloud.common.nttcis import NttCisAntiAffinityRule from libcloud.common.nttcis import NttCisIpAddressList from libcloud.common.nttcis import NttCisChildIpAddressList from libcloud.common.nttcis import NttCisIpAddress +from libcloud.common.nttcis import NttCisReservedIpAddress from libcloud.common.nttcis import NttCisPortList from libcloud.common.nttcis import NttCisPort from libcloud.common.nttcis import NttCisChildPortList @@ -47,16 +48,16 @@ from libcloud.common.nttcis import NttCisNic from libcloud.common.nttcis import NetworkDomainServicePlan from libcloud.common.nttcis import NttCisTagKey from libcloud.common.nttcis import NttCisTag -from libcloud.common.nttcis import NttCisSnapshot +from libcloud.common.nttcis import XmlDictConfig, XmlListConfig, Generic from libcloud.common.nttcis import API_ENDPOINTS, DEFAULT_REGION from libcloud.common.nttcis import TYPES_URN from libcloud.common.nttcis import SERVER_NS, NETWORK_NS, GENERAL_NS from libcloud.utils.py3 import urlencode, ensure_string -from libcloud.utils.xml import fixxpath, findtext, findall +from libcloud.utils.xml import fixxpath, findtext, findall, return_all from libcloud.utils.py3 import basestring from libcloud.compute.types import NodeState, Provider import sys - +import re # Node state map is a dictionary with the keys as tuples # These tuples represent: # (<state_of_node_from_didata>, <is node started?>, <action happening>) @@ -1974,6 +1975,82 @@ class NttCisNodeDriver(NodeDriver): response_code = findtext(result, 'responseCode', TYPES_URN) return response_code in ['IN_PROGRESS', 'OK'] + + # 09/10/18 Adding private IPv4 and IPv6 addressing capability + def ex_reserve_ip(self, vlan: NttCisVlan, ip: str, description: None) -> bool: + vlan_id = self._vlan_to_vlan_id(vlan) + if re.match(r'(\d+\.){3}', ip): + private_ip = ET.Element('reservePrivateIpv4Address', {'xmlns': TYPES_URN}) + resource = 'network/reservePrivateIpv4Address' + elif re.search(r':', ip): + private_ip = ET.Element('reserveIpv6Address', {'xmlns': TYPES_URN}) + resource = 'network/reserveIpv6Address' + ET.SubElement(private_ip, 'vlanId').text = vlan_id + ET.SubElement(private_ip, 'ipAddress').text = ip + if description is not None: + ET.SubElement(private_ip, 'description').text = description + + result = self.connection.request_with_orgId_api_2( + resource, + method='POST', + data=ET.tostring(private_ip)).object + response_code = findtext(result, 'responseCode', TYPES_URN) + return response_code in ['IN_PROGRESS', 'OK'] + + def ex_unreserve_ip_addresses(self, vlan: NttCisVlan, ip: str) -> bool: + vlan_id = self._vlan_to_vlan_id(vlan) + if re.match(r'(\d+\.){3}', ip): + private_ip = ET.Element('unreservePrivateIpv4Address', {'xmlns': TYPES_URN}) + resource = 'network/reservePrivateIpv4Address' + elif re.search(r':', ip): + private_ip = ET.Element('unreserveIpv6Address', {'xmlns': TYPES_URN}) + resource = 'network/unreserveIpv6Address' + ET.SubElement(private_ip, 'vlanId').text = vlan_id + ET.SubElement(private_ip, 'ipAddress').text = ip + result = self.connection.\ + request_with_orgId_api_2( + resource, + method='POST', + data=ET.tostring(private_ip)).object + response_code = findtext(result, 'responseCode', TYPES_URN) + return response_code in ['IN_PROGRESS', 'OK'] + + def ex_list_reserved_ipv4(self, vlan: NttCisVlan=None, datacenter_id: str=None): + if vlan is not None: + vlan_id = self._vlan_to_vlan_id(vlan) + params = {"vlanId": vlan_id} + response = self.connection \ + .request_with_orgId_api_2('network/reservedPrivateIpv4Address', params=params).object + + elif datacenter_id is not None: + params = {'datacenterId': datacenter_id} + response = self.connection \ + .request_with_orgId_api_2('network/reservedPrivateIpv4Address', params=params).object + else: + response = self.connection \ + .request_with_orgId_api_2('network/reservedPrivateIpv4Address').object + + addresses = self._to_ipv4_addresses(response) + return addresses + + def ex_list_reserved_ipv6(self, vlan: NttCisVlan=None, datacenter_id: str=None): + if vlan is not None: + vlan_id = self._vlan_to_vlan_id(vlan) + params = {"vlanId": vlan_id} + response = self.connection \ + .request_with_orgId_api_2('network/reservedIpv6Address', params=params).object + + elif datacenter_id is not None: + params = {'datacenterId': datacenter_id} + response = self.connection \ + .request_with_orgId_api_2('network/reservedIpv6Address', params=params).object + else: + response = self.connection \ + .request_with_orgId_api_2('network/reservedIpv6Address').object + + addresses = self._to_ipv6_addresses(response) + return addresses + def ex_get_node_by_id(self, id): node = self.connection.request_with_orgId_api_2( 'server/server/%s' % id).object @@ -3816,7 +3893,6 @@ class NttCisNodeDriver(NodeDriver): :return: NttCisPortList object :rtype: :class:`NttCisPort` """ - url_path = ('network/portList/%s' % ex_portlist_id) response = self.connection.request_with_orgId_api_2( url_path).object @@ -3925,8 +4001,8 @@ class NttCisNodeDriver(NodeDriver): response_code = findtext(response, 'responseCode', TYPES_URN) return response_code in ['IN_PROGRESS', 'OK'] - def ex_edit_portlist(self, ex_portlist, description, - port_collection, child_portlist_list=None): + def ex_edit_portlist(self, ex_portlist, description=None, + port_collection=None, child_portlist_list=None): """ Edit Port List. @@ -3985,21 +4061,25 @@ class NttCisNodeDriver(NodeDriver): 'xmlns': TYPES_URN, 'xmlns:xsi': "http://www.w3.org/2001/XMLSchema-instance" }) + if description is not None: + if description != 'nil': + ET.SubElement( + existing_port_address_list, + 'description' + ).text = description + else: + ET.SubElement(existing_port_address_list, "description", {"xsi:nil": "true"}) - ET.SubElement( - existing_port_address_list, - 'description' - ).text = description - - for port in port_collection: - p = ET.SubElement( - existing_port_address_list, - 'port' - ) - p.set('begin', port.begin) + if port_collection is not None: + for port in port_collection: + p = ET.SubElement( + existing_port_address_list, + 'port' + ) + p.set('begin', port.begin) - if port.end: - p.set('end', port.end) + if port.end: + p.set('end', port.end) if child_portlist_list is not None: for child in child_portlist_list: @@ -4807,6 +4887,23 @@ class NttCisNodeDriver(NodeDriver): 'state': findtext(element, 'state', TYPES_URN)} #'server_config': self.to_snapshot_conf_elems(findtext(element, 'serverConfig', TYPES_URN)} + def _to_ipv4_addresses(self, object): + ipv4_address_elements = object.findall(fixxpath('ipv4', TYPES_URN)) + return [self._to_ipv4_6_address(el) for el in ipv4_address_elements] + + def _to_ipv6_addresses(self, object): + ipv6_address_elements = object.findall(fixxpath('reservedIpv6Address', TYPES_URN)) + return [self._to_ipv4_6_address(el) for el in ipv6_address_elements] + + def _to_ipv4_6_address(self, element): + return NttCisReservedIpAddress( + element.get('datacenterId'), + element.get('exclusive'), + findtext(element, 'vlanId', TYPES_URN), + findtext(element, 'ipAddress', TYPES_URN), + description=findtext(element, 'description', TYPES_URN), + ) + def _to_windows(self, object): http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/libcloud/loadbalancer/drivers/nttcis.py ---------------------------------------------------------------------- diff --git a/libcloud/loadbalancer/drivers/nttcis.py b/libcloud/loadbalancer/drivers/nttcis.py index 5584b52..85a1f51 100644 --- a/libcloud/loadbalancer/drivers/nttcis.py +++ b/libcloud/loadbalancer/drivers/nttcis.py @@ -23,6 +23,7 @@ from libcloud.common.nttcis import NttCisDefaultHealthMonitor from libcloud.common.nttcis import NttCisPersistenceProfile from libcloud.common.nttcis import \ NttCisVirtualListenerCompatibility +from libcloud.common.nttcis import Node from libcloud.common.nttcis import NttCisDefaultiRule from libcloud.common.nttcis import API_ENDPOINTS from libcloud.common.nttcis import DEFAULT_REGION @@ -47,7 +48,7 @@ class NttCisLBDriver(Driver): type = Provider.NTTCIS api_version = 1.0 - network_domain_id = None + #network_domain_id = None _VALUE_TO_ALGORITHM_MAP = { 'ROUND_ROBIN': Algorithm.ROUND_ROBIN, @@ -68,9 +69,11 @@ class NttCisLBDriver(Driver): 'REQUIRES_SUPPORT': State.ERROR } - def __init__(self, key, secret=None, secure=True, host=None, port=None, + def __init__(self, key, network_domain_id, secret=None, secure=True, host=None, port=None, api_version=None, region=DEFAULT_REGION, **kwargs): + self.network_domain_id = network_domain_id + if region not in API_ENDPOINTS and host is None: raise ValueError( 'Invalid region: %s, no host specified' % (region)) @@ -78,11 +81,11 @@ class NttCisLBDriver(Driver): self.selected_region = API_ENDPOINTS[region] super(NttCisLBDriver, self).__init__(key=key, secret=secret, - secure=secure, host=host, - port=port, - api_version=api_version, - region=region, - **kwargs) + secure=secure, host=host, + port=port, + api_version=api_version, + region=region, + **kwargs) def _ex_connection_class_kwargs(self): """ @@ -138,14 +141,15 @@ class NttCisLBDriver(Driver): # Attach the members to the pool as nodes if members is not None: for member in members: - node = self.ex_create_node( - network_domain_id=network_domain_id, - name=member.ip, - ip=member.ip, - ex_description=None) + #if not isinstance(member, Node): + # node = self.ex_create_node( + # network_domain_id=network_domain_id, + # name=member.name, + # ip=member.private_ips[0], + # ex_description=None) self.ex_create_pool_member( pool=pool, - node=node, + node=member, port=port) # Create the virtual listener (balancer) @@ -354,7 +358,7 @@ class NttCisLBDriver(Driver): if port is not None: ET.SubElement(create_pool_m, "port").text = str(port) ET.SubElement(create_pool_m, "status").text = 'ENABLED' - + test = ET.tostring(create_pool_m) response = self.connection.request_with_orgId_api_2( 'networkDomainVip/addPoolMember', method='POST', http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/libcloud/utils/xml.py ---------------------------------------------------------------------- diff --git a/libcloud/utils/xml.py b/libcloud/utils/xml.py index 6a5d984..7b695d1 100644 --- a/libcloud/utils/xml.py +++ b/libcloud/utils/xml.py @@ -48,3 +48,17 @@ def findattr(element, xpath, namespace=None): def findall(element, xpath, namespace=None): return element.findall(fixxpath(xpath=xpath, namespace=namespace)) + + +def return_all(parent_element): + elem_dict = {} + if parent_element.items(): + elem_dict.update(dict(parent_element.items())) + for element in parent_element: + if element.items(): + elem_dict.update(dict(element.items())) + elif element.text: + elem_dict.update({element.tag.split('}')[1]: element.text}) + else: + elem_dict.update(element.attrib) + return elem_dict http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/tests/conftest.py ---------------------------------------------------------------------- diff --git a/tests/conftest.py b/tests/conftest.py index f5a48f8..4bb21bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,11 @@ def compute_driver(): @pytest.fixture(scope="module") def lbdriver(): + cd = libcloud.get_driver(libcloud.DriverType.COMPUTE, libcloud.DriverType.COMPUTE.NTTCIS) + compute_driver = cd('mitchgeo-test', 'Snmpv2c!', region='dd-eu') + net_domain_name = 'sdk_test_1' + net_domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain_id = [d for d in net_domains if d.name == net_domain_name][0].id cls = libcloud.get_driver(libcloud.DriverType.LOADBALANCER, libcloud.DriverType.LOADBALANCER.NTTCIS) - lbdriver = cls('mitchgeo-test', 'Snmpv2c!', region='dd-eu') + lbdriver = cls('mitchgeo-test', net_domain_id, 'Snmpv2c!', region='dd-eu') return lbdriver \ No newline at end of file http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/tests/lib_create_test.py ---------------------------------------------------------------------- diff --git a/tests/lib_create_test.py b/tests/lib_create_test.py index e7d5288..2494f31 100644 --- a/tests/lib_create_test.py +++ b/tests/lib_create_test.py @@ -2,18 +2,44 @@ from pprint import pprint import pytest import libcloud from libcloud import loadbalancer -from libcloud.compute.drivers.nttcis import NttCisPort, NttCisIpAddress +from libcloud.compute.drivers.nttcis import NttCisPort, NttCisIpAddress, NttCisPublicIpBlock, NttCisNatRule from libcloud.common.nttcis import NttCisFirewallRule, NttCisVlan, NttCisFirewallAddress -def test_deploy_vlan(compute_driver, vlan_name, network_domain_id, base_ipv4_addr): - network_domain = compute_driver.ex_get_network_domain(network_domain_id) +def test_deploy_vlan(compute_driver, vlan_name='sdk_test2', network_domain_name='sdk_test_1', base_ipv4_addr='10.1.2.0'): + # Default network size is 24 bits. Interval and polling times default to 2 and 60. + interval = 3 + timeout = 60 + network_domains = compute_driver.ex_list_network_domains(location='EU6') + network_domain = [nd for nd in network_domains if nd.name == network_domain_name][0] result = compute_driver.ex_create_vlan(network_domain, vlan_name, base_ipv4_addr) assert isinstance(result, NttCisVlan) - compute_driver.ex_wait_for_state('normal', compute_driver.ex_get_vlan, 2, 60, result.id) + compute_driver.ex_wait_for_state('normal', compute_driver.ex_get_vlan, interval, timeout, result.id) return result +def test_deploy_vlan_2(compute_driver, vlan_name='sdk_test_3', network_domain_name='sdk_test_1', + base_ipv4_addr='10.2.0.0', private_ipv4_prefix_size=24): + # Default network size is 24 bits. Interval and polling times default to 2 and 60. + interval = 3 + timeout = 60 + network_domains = compute_driver.ex_list_network_domains(location='EU6') + network_domain = [nd for nd in network_domains if nd.name == network_domain_name][0] + result = compute_driver.ex_create_vlan(network_domain, vlan_name, base_ipv4_addr, + private_ipv4_prefix_size=private_ipv4_prefix_size) + assert isinstance(result, NttCisVlan) + compute_driver.ex_wait_for_state('normal', compute_driver.ex_get_vlan, interval, timeout, result.id) + return result + + +def test_create_nat_rule(compute_driver): + network_domain_name = "sdk_test_1" + network_domains = compute_driver.ex_list_network_domains(location='EU6') + network_domain = [nd for nd in network_domains if nd.name == network_domain_name][0] + result = compute_driver.ex_create_nat_rule(network_domain, '10.1.1.7', '168.128.13.126') + assert isinstance(result, NttCisNatRule) + + def test_deploy_server(compute_driver): image_id = "81a36aa0-555c-4735-b965-4b64fcf0ac8f" images = compute_driver.list_images(location='EU6') @@ -133,4 +159,51 @@ def test_create_address_list(compute_driver): result = compute_driver.ex_create_ip_address_list(net_domain[0], address_list_name, description, ip_version, address_list) - assert result is True \ No newline at end of file + assert result is True + + +def test_create_public_ip_block(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name][0] + ip_block = compute_driver.ex_add_public_ip_block_to_network_domain(net_domain) + assert isinstance(ip_block, NttCisPublicIpBlock) + print(ip_block) + + +def test_create_private_ipv4_address(compute_driver): + vlan_name = 'sdk_vlan1' + vlan = compute_driver.ex_list_vlans(name=vlan_name)[0] + ip = '10.1.1.20' + description = 'A test reserved ipv4 address' + result = compute_driver.ex_reserve_ip(vlan, ip, description) + assert result is True + + +def test_create_ipv6_addresss(compute_driver): + vlan_name = 'sdk_vlan1' + vlan = compute_driver.ex_list_vlans(name=vlan_name)[0] + ipv6 = '2a00:47c0:111:1331:7df0:9beb:43c9:5c' + result = compute_driver.ex_reserve_ip(vlan, ipv6) + assert result is True + + +def test_create_load_balancer(lbdriver, compute_driver): + + member1 = compute_driver.list_nodes(ex_name='web1')[0] + member2 = compute_driver.list_nodes(ex_name='web2')[0] + members = [member1, member2] + name = 'sdk_test_balancer' + port = '8000' + protocol = 'http' + algorithm = 1 + members = [m for m in members] + ex_listener_ip_address = "168.128.13.127" + lb = lbdriver.create_balancer(name, port=port, protocol=protocol, algorithm=algorithm, members=members, + ex_listener_ip_address=ex_listener_ip_address) + assert lb.name == name + + +def test_create_pool(lbdriver): + pass + http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/tests/lib_edit_test.py ---------------------------------------------------------------------- diff --git a/tests/lib_edit_test.py b/tests/lib_edit_test.py index e4c3337..ffccd6c 100644 --- a/tests/lib_edit_test.py +++ b/tests/lib_edit_test.py @@ -1,6 +1,7 @@ import pytest import libcloud from libcloud import loadbalancer +from libcloud.compute.drivers.nttcis import NttCisPort from libcloud.common.nttcis import NttCisIpAddress, NttCisVlan from tests.lib_create_test import test_deploy_vlan @@ -91,6 +92,27 @@ def test_reconfigure_node(compute_driver): assert result is True +def test_edit_vlan(compute_driver): + vlan = compute_driver.ex_list_vlans(name='sdk_test2')[0] + vlan.name = 'sdk_test_2' + vlan.description = "Second test Vlan" + result = compute_driver.ex_update_vlan(vlan) + assert isinstance(result, NttCisVlan) + + +def test_expand_vlan(compute_driver): + vlan = compute_driver.ex_list_vlans(name='sdk_test_3')[0] + vlan.private_ipv4_range_size = '23' + result = compute_driver.ex_expand_vlan(vlan) + assert isinstance(result, NttCisVlan) + + +def test_delete_vlan(compute_driver): + vlan = compute_driver.ex_list_vlans(name='sdk_test_3')[0] + result = compute_driver.ex_delete_vlan(vlan) + assert result is True + + def test_add_disk_by_node(compute_driver): """ Speeds can be specified based on DataCenter @@ -275,8 +297,8 @@ def test_delete_port_list(compute_driver): def test_edit_address_list(compute_driver): domain_name = 'sdk_test_1' domains = compute_driver.ex_list_network_domains(location='EU6') - net_domain = [d for d in domains if d.name == domain_name] - addr_list = compute_driver.ex_get_ip_address_list(net_domain[0], 'sdk_test_address_list') + net_domain = [d for d in domains if d.name == domain_name][0] + addr_list = compute_driver.ex_get_ip_address_list(net_domain, 'sdk_test_address_list') assert addr_list[0].ip_version == 'IPV4' ip_address_1 = NttCisIpAddress(begin='190.2.2.100') ip_address_2 = NttCisIpAddress(begin='190.2.2.106', end='190.2.2.108') @@ -289,12 +311,18 @@ def test_edit_address_list(compute_driver): assert result is True +def test_delete_public_ip_block(compute_driver): + block = compute_driver.ex_get_public_ip_block("813b87a8-18e1-11e5-8d4f-180373fb68df") + result = compute_driver.ex_delete_public_ip_block(block) + assert result is True + + def test_edit_address_list_2(compute_driver): domain_name = 'sdk_test_1' domains = compute_driver.ex_list_network_domains(location='EU6') - net_domain = [d for d in domains if d.name == domain_name] + net_domain = [d for d in domains if d.name == domain_name][0] # An ip address list object can be used as an argument or the id of the address list - addr_list = compute_driver.ex_get_ip_address_list(net_domain[0], 'sdk_test_address_list') + addr_list = compute_driver.ex_get_ip_address_list(net_domain, 'sdk_test_address_list') result = compute_driver.ex_edit_ip_address_list("d32aa8d4-831b-4fd6-95da-c639768834f0", description='nil') @@ -304,12 +332,41 @@ def test_edit_address_list_2(compute_driver): def test_delete_address_list(compute_driver): domain_name = 'sdk_test_1' domains = compute_driver.ex_list_network_domains(location='EU6') - net_domain = [d for d in domains if d.name == domain_name] - addresslist_to_delete = compute_driver.ex_get_ip_address_list(net_domain[0], 'sdk_test_address_list') + net_domain = [d for d in domains if d.name == domain_name][0] + addresslist_to_delete = compute_driver.ex_get_ip_address_list(net_domain, 'sdk_test_address_list') print(addresslist_to_delete) + +def test_edit_port_list_1(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + port_list_name = 'sdk_test_port_list' + port_lists = compute_driver.ex_list_portlist(net_domain[0]) + port_list = [port for port in port_lists if port.name == port_list_name][0] + port_collection = [NttCisPort(begin='8000', end='8080'), NttCisPort(begin='9000')] + result = compute_driver.ex_edit_portlist(port_list.id, port_collection=port_collection) + assert result is True + + +def test_unreserve_ip_address(compute_driver): + vlan_name = 'sdk_vlan1' + vlan = compute_driver.ex_list_vlans(name=vlan_name)[0] + ip = '2a00:47c0:111:1331:7df0:9beb:43c9:5c' + result = compute_driver.ex_unreserve_ip_addresses(vlan, ip) + assert result is True + + def test_list_locations(compute_driver): locations = compute_driver.list_locations() for location in locations: print(location) + +def test_delete_nat_rule(compute_driver): + network_domain_name = "sdk_test_1" + network_domains = compute_driver.ex_list_network_domains(location='EU6') + network_domain = [nd for nd in network_domains if nd.name == network_domain_name][0] + rule = compute_driver.ex_get_nat_rule(network_domain, '74f0897f-5536-4c17-84b0-d52b1fb3aea6') + result = compute_driver.ex_delete_nat_rule(rule) + assert result is True http://git-wip-us.apache.org/repos/asf/libcloud/blob/b19a1653/tests/lib_list_test.py ---------------------------------------------------------------------- diff --git a/tests/lib_list_test.py b/tests/lib_list_test.py index 9497473..ce01277 100644 --- a/tests/lib_list_test.py +++ b/tests/lib_list_test.py @@ -78,6 +78,7 @@ def test_list_node_by_image(compute_driver): """ requires retrieving vlan Id first """ + def test_list_node_vlan(compute_driver): nodes = compute_driver.list_nodes(ex_vlan='eb05a24e-85a6-46e3-a7c9-f1765737476d') print() @@ -160,7 +161,7 @@ def test_list_vlan(compute_driver): def test_list_datacenter_object_creation(compute_driver): datacenter = compute_driver.ex_get_datacenter('EU6') - + def test_list_firewall_rules(compute_driver): rules = compute_driver.ex_list_firewall_rules('6aafcf08-cb0b-432c-9c64-7371265db086') @@ -307,9 +308,55 @@ def test_list_sizes(compute_driver): print(property) """ + def test_images(compute_driver): images = compute_driver.list_images() print() print(images) assert isinstance(images, list) and len(images) > 0 + +def test_list_public_ip_blocks(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name][0] + blocks = compute_driver.ex_list_public_ip_blocks(net_domain) + print(blocks) + + +def test_list_private_ipv4_addresses_vlan(compute_driver): + vlan_name = 'sdk_vlan1' + vlan = compute_driver.ex_list_vlans(name=vlan_name)[0] + ip_addresses = compute_driver.ex_list_reserved_ipv4(vlan=vlan) + for ip_address in ip_addresses: + print(ip_address) + + +def test_list_private_ipv4_addresses_datacenter(compute_driver): + datacenter_id = 'EU8' + ip_addresses = compute_driver.ex_list_reserved_ipv4(datacenter_id=datacenter_id) + for ip_address in ip_addresses: + print(ip_address) + + +def test_list_private_ipv4_addresses_all(compute_driver): + ip_addresses = compute_driver.ex_list_reserved_ipv4() + for ip_address in ip_addresses: + print(ip_address) + + +def test_list_reserved_ipv6_address_vlan(compute_driver): + vlan_name = 'sdk_vlan1' + vlan = compute_driver.ex_list_vlans(name=vlan_name)[0] + ip_addresses = compute_driver.ex_list_reserved_ipv6(vlan=vlan) + for ip_address in ip_addresses: + print(ip_address) + + +def test_list_nat_rules(compute_driver): + network_domain_name = "sdk_test_1" + network_domains = compute_driver.ex_list_network_domains(location='EU6') + network_domain = [nd for nd in network_domains if nd.name == network_domain_name][0] + rules = compute_driver.ex_list_nat_rules(network_domain) + for rule in rules: + print(rule) \ No newline at end of file
