fixed ex_edit_firewall_rule as position is not required unless changing firewall rule placement
Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/fd68cc6d Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/fd68cc6d Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/fd68cc6d Branch: refs/heads/trunk Commit: fd68cc6d91531ca44f353e34d0c6938d12e7ba42 Parents: 273486d Author: mitch <[email protected]> Authored: Thu Sep 6 13:22:53 2018 -0400 Committer: mitch <[email protected]> Committed: Thu Sep 6 13:22:53 2018 -0400 ---------------------------------------------------------------------- libcloud/common/nttcis.py | 45 ++++++++- libcloud/compute/drivers/nttcis.py | 158 ++++++++++++++++++++++++++++---- tests/lib_create_test.py | 98 +++++++++++++++++++- tests/lib_edit_test.py | 24 +++++ tests/lib_list_test.py | 2 + 5 files changed, 302 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/libcloud/common/nttcis.py ---------------------------------------------------------------------- diff --git a/libcloud/common/nttcis.py b/libcloud/common/nttcis.py index e48cbb1..4b5272a 100644 --- a/libcloud/common/nttcis.py +++ b/libcloud/common/nttcis.py @@ -929,11 +929,11 @@ class NttCisFirewallRule(object): self.protocol, self.source, self.destination, self.enabled)) - +""" class NttCisFirewallAddress(object): - """ + The source or destination model in a firewall rule - """ + def __init__(self, any_ip, ip_address, ip_prefix_size, port_begin, port_end, address_list_id, port_list_id): @@ -954,6 +954,45 @@ class NttCisFirewallAddress(object): % (self.any_ip, self.ip_address, self.ip_prefix_size, self.port_begin, self.port_end, self.address_list_id, self.port_list_id)) +""" + + +class NttCisFirewallAddress(object): + """ + The source or destination model in a firewall rule + 9/4/18: Editing Class to use with ex_create_firewall_rtule method. + Will haved to circle back and test for any other uses. + """ + + def __init__(self, any_ip=None, ip_address=None, ip_prefix_size=None, + port_begin=None, port_end=None, address_list_id=None, + port_list_id=None): + """ + param any_ip: used to set ip address to "ANY" + :param ip_address: An ip address of either IPv4 decimal notation or an IPv6 address + :param ip_prefix_size: An integer denoting prefix size. + :param port_begin: integer for an individual port or start of a list of ports if not using a port list + :param port_end: integer required if using a list of ports (NOT a port list but a list starting with port begin) + :param address_list_id: An id identifying an address list + :param port_list_id: An id identifying a port list + """ + self.any_ip = any_ip + self.ip_address = ip_address + self.ip_prefix_size = ip_prefix_size + self.port_list_id = port_list_id + self.port_begin = port_begin + self.port_end = port_end + self.address_list_id = address_list_id + self.port_list_id = port_list_id + + def __repr__(self): + return ( + '<NttCisFirewallAddress: any_ip=%s, ip_address=%s, ' + 'ip_prefix_size=%s, port_begin=%s, port_end=%s, ' + 'address_list_id=%s, port_list_id=%s>' + % (self.any_ip, self.ip_address, self.ip_prefix_size, + self.port_begin, self.port_end, self.address_list_id, + self.port_list_id)) class NttCisNatRule(object): http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/libcloud/compute/drivers/nttcis.py ---------------------------------------------------------------------- diff --git a/libcloud/compute/drivers/nttcis.py b/libcloud/compute/drivers/nttcis.py index 1048eaa..0f79bca 100644 --- a/libcloud/compute/drivers/nttcis.py +++ b/libcloud/compute/drivers/nttcis.py @@ -1990,9 +1990,10 @@ class NttCisNodeDriver(NodeDriver): params=params).object return self._to_firewall_rules(response, network_domain) + """ def ex_create_firewall_rule(self, network_domain, rule, position, position_relative_to_rule=None): - """ + Creates a firewall rule :param network_domain: The network domain in which to create @@ -2015,7 +2016,7 @@ class NttCisNodeDriver(NodeDriver): :class:`NttCisFirewallRule` or ``str`` :rtype: ``bool`` - """ + positions_without_rule = ('FIRST', 'LAST') positions_with_rule = ('BEFORE', 'AFTER') @@ -2103,8 +2104,123 @@ class NttCisNodeDriver(NodeDriver): rule_id = info.get('value') rule.id = rule_id return rule + """ + + def ex_create_firewall_rule(self, network_domain, name, action, ip_version, protocol, + source_addr, destination, position, enabled=1, position_relative_to_rule=None): + """ + Creates a firewall rule + + :param network_domain: The network domain in which to create + the firewall rule + :type network_domain: :class:`NttCisNetworkDomain` or ``str`` + + :param rule: The rule in which to create + :type rule: :class:`NttCisFirewallRule` + + :param position: The position in which to create the rule + There are two types of positions + with position_relative_to_rule arg and without it + With: 'BEFORE' or 'AFTER' + Without: 'FIRST' or 'LAST' + :type position: ``str`` + + :param position_relative_to_rule: The rule or rule name in + which to decide positioning by + :type position_relative_to_rule: + :class:`NttCisFirewallRule` or ``str`` + + :rtype: ``bool`` + """ + positions_without_rule = ('FIRST', 'LAST') + positions_with_rule = ('BEFORE', 'AFTER') + + create_node = ET.Element('createFirewallRule', {'xmlns': TYPES_URN}) + ET.SubElement(create_node, "networkDomainId").text = \ + self._network_domain_to_network_domain_id(network_domain) + ET.SubElement(create_node, "name").text = name + ET.SubElement(create_node, "action").text = action + ET.SubElement(create_node, "ipVersion").text = ip_version + ET.SubElement(create_node, "protocol").text = protocol + # Setup source port rule + source = ET.SubElement(create_node, "source") + if source_addr.address_list_id is not None: + source_ip = ET.SubElement(source, 'ipAddressListId') + source_ip.text = source_addr.address_list_id + else: + source_ip = ET.SubElement(source, 'ip') + if source_addr.any_ip: + source_ip.set('address', 'ANY') + else: + source_ip.set('address', source.ip_address) + if source.ip_prefix_size is not None: + source_ip.set('prefixSize', + str(source.ip_prefix_size)) + if source_addr.port_list_id is not None: + source_port = ET.SubElement(source, 'portListId') + source_port.text = source.port_list_id + else: + if source_addr.port_begin is not None: + source_port = ET.SubElement(source, 'port') + source_port.set('begin', source_addr.port_begin) + if source_addr.port_end is not None: + source_port.set('end', source_addr.port_end) + # Setup destination port rule + dest = ET.SubElement(create_node, "destination") + if destination.address_list_id is not None: + dest_ip = ET.SubElement(dest, 'ipAddressListId') + dest_ip.text = destination.address_list_id + else: + dest_ip = ET.SubElement(dest, 'ip') + if destination.any_ip: + dest_ip.set('address', 'ANY') + else: + dest_ip.set('address', destination.ip_address) + if destination.ip_prefix_size is not None: + dest_ip.set('prefixSize', destination.ip_prefix_size) + if destination.port_list_id is not None: + dest_port = ET.SubElement(dest, 'portListId') + dest_port.text = destination.port_list_id + else: + if destination.port_begin is not None: + dest_port = ET.SubElement(dest, 'port') + dest_port.set('begin', destination.port_begin) + if destination.port_end is not None: + dest_port.set('end', destination.port_end) + # Set up positioning of rule + ET.SubElement(create_node, "enabled").text = str(enabled) + placement = ET.SubElement(create_node, "placement") + if position_relative_to_rule is not None: + if position not in positions_with_rule: + raise ValueError("When position_relative_to_rule is specified" + " position must be %s" + % ', '.join(positions_with_rule)) + if isinstance(position_relative_to_rule, + NttCisFirewallRule): + rule_name = position_relative_to_rule.name + else: + rule_name = position_relative_to_rule + placement.set('relativeToRule', rule_name) + else: + if position not in positions_without_rule: + raise ValueError("When position_relative_to_rule is not" + " specified position must be %s" + % ', '.join(positions_without_rule)) + placement.set('position', position) + + response = self.connection.request_with_orgId_api_2( + 'network/createFirewallRule', + method='POST', + data=ET.tostring(create_node)).object + + rule_id = None + for info in findall(response, 'info', TYPES_URN): + if info.get('name') == 'firewallRuleId': + rule_id = info.get('value') + rule = self.ex_get_firewall_rule(network_domain, rule_id) + return rule - def ex_edit_firewall_rule(self, rule, position, + def ex_edit_firewall_rule(self, rule, position=None, relative_rule_for_position=None): """ Edit a firewall rule @@ -2222,24 +2338,26 @@ class NttCisNodeDriver(NodeDriver): dest_port.set('end', rule.destination.port_end) # Set up positioning of rule ET.SubElement(edit_node, "enabled").text = str(rule.enabled).lower() - placement = ET.SubElement(edit_node, "placement") - if relative_rule_for_position is not None: - if position not in positions_with_rule: - raise ValueError("When position_relative_to_rule is specified" - " position must be %s" - % ', '.join(positions_with_rule)) - if isinstance(relative_rule_for_position, - NttCisFirewallRule): - rule_name = relative_rule_for_position.name + # changing placement to an option + if position is not None: + placement = ET.SubElement(edit_node, "placement") + if relative_rule_for_position is not None: + if position not in positions_with_rule: + raise ValueError("When position_relative_to_rule is specified" + " position must be %s" + % ', '.join(positions_with_rule)) + if isinstance(relative_rule_for_position, + NttCisFirewallRule): + rule_name = relative_rule_for_position.name + else: + rule_name = relative_rule_for_position + placement.set('relativeToRule', rule_name) else: - rule_name = relative_rule_for_position - placement.set('relativeToRule', rule_name) - else: - if position not in positions_without_rule: - raise ValueError("When position_relative_to_rule is not" - " specified position must be %s" - % ', '.join(positions_without_rule)) - placement.set('position', position) + if position not in positions_without_rule: + raise ValueError("When position_relative_to_rule is not" + " specified position must be %s" + % ', '.join(positions_without_rule)) + placement.set('position', position) response = self.connection.request_with_orgId_api_2( 'network/editFirewallRule', http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/tests/lib_create_test.py ---------------------------------------------------------------------- diff --git a/tests/lib_create_test.py b/tests/lib_create_test.py index a2e85ac..e7d5288 100644 --- a/tests/lib_create_test.py +++ b/tests/lib_create_test.py @@ -1,7 +1,9 @@ +from pprint import pprint import pytest import libcloud from libcloud import loadbalancer -from libcloud.common.nttcis import NttCisAPIException, NttCisVlan +from libcloud.compute.drivers.nttcis import NttCisPort, NttCisIpAddress +from libcloud.common.nttcis import NttCisFirewallRule, NttCisVlan, NttCisFirewallAddress def test_deploy_vlan(compute_driver, vlan_name, network_domain_id, base_ipv4_addr): @@ -39,4 +41,96 @@ def test_delete_server(compute_driver): compute_driver.ex_wait_for_state('terminated', compute_driver.ex_get_node_by_id, 2, 240, server.id) -def test_deploy_firewall_rule \ No newline at end of file +def test_deploy_firewall_rule_1(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + address_list_name = 'sdk_test_address_list' + address_lists = compute_driver.ex_list_ip_address_list('6aafcf08-cb0b-432c-9c64-7371265db086') + # using lambda with filter + + # address_list = list(filter(lambda x: address_list_name, address_lists)) + # address_list_id = address_list[0].id + + # using list comprehension to filter + + address_list = [a for a in address_lists if a.name == address_list_name] + address_list_id = address_list[0].id + + port_list_name = 'sdk_test_port_list' + port_lists = compute_driver.ex_list_portlist('6aafcf08-cb0b-432c-9c64-7371265db086') + port_list = [p for p in port_lists if p.name == port_list_name] + port_list_id = port_list[0].id + dest_firewall_address = NttCisFirewallAddress(address_list_id=address_list_id, port_list_id=port_list_id) + source_firewall_address = NttCisFirewallAddress(any_ip='ANY') + rule = compute_driver.ex_create_firewall_rule(net_domain[0], 'sdk_test_firewall_rule_1', 'ACCEPT_DECISIVELY', + 'IPV4', 'TCP', source_firewall_address, dest_firewall_address, 'LAST') + print(rule) + assert isinstance(rule, NttCisFirewallRule) + + +def test_deploy_firewall_rule_2(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + source_firewall_address = NttCisFirewallAddress(any_ip='ANY') + dest_firewall_address = NttCisFirewallAddress(ip_address='10.2.0.0', ip_prefix_size='16', + port_begin='8000', port_end='8080') + + rule = compute_driver.ex_create_firewall_rule(net_domain[0], 'sdk_test_firewall_rule_2', 'ACCEPT_DECISIVELY', + 'IPV4', 'TCP', source_firewall_address, dest_firewall_address, 'LAST') + print(rule) + assert isinstance(rule, NttCisFirewallRule) + + +def test_deploy_firewall_rule_3(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + source_firewall_address = NttCisFirewallAddress(any_ip='ANY') + dest_firewall_address = NttCisFirewallAddress(ip_address='10.2.0.0', ip_prefix_size='16', + port_begin='25') + rule_name = 'sdk_test_firewall_rule_2' + rules = compute_driver.ex_list_firewall_rules(net_domain[0]) + rule = [rule for rule in rules if rule.name == rule_name] + relative_to = compute_driver.ex_get_firewall_rule(net_domain[0], rule[0].id) + rule = compute_driver.ex_create_firewall_rule(net_domain[0], 'sdk_test_firewall_rule_3', 'ACCEPT_DECISIVELY', + 'IPV4', 'TCP', source_firewall_address, dest_firewall_address, + 'BEFORE', position_relative_to_rule=relative_to) + print(rule) + assert isinstance(rule, NttCisFirewallRule) + + +def test_create_port_list(compute_driver): + """ + An optional named argument, child_portlist_list, which takes the id of an existing + port list to include in this port list. + """ + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + port_list_name = 'sdk_test_port_list' + description = 'A test port list' + port_list = [NttCisPort(begin='8000', end='8080')] + result = compute_driver.ex_create_portlist(net_domain[0], port_list_name, description, port_list) + assert result is True + + +def test_create_address_list(compute_driver): + """ + An optional named argument, child_ip_address_list, which takes the id of an existing + port list to include in this port list. + """ + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + address_list_name = 'sdk_test_address_list' + description = 'A test address list' + ip_version = 'IPV4' + # An optional prefix list can be specified as a named argument, prefix_size= + address_list = [NttCisIpAddress('10.2.0.1', end='10.2.0.11')] + + result = compute_driver.ex_create_ip_address_list(net_domain[0], address_list_name, + description, + ip_version, address_list) + assert result is True \ No newline at end of file http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/tests/lib_edit_test.py ---------------------------------------------------------------------- diff --git a/tests/lib_edit_test.py b/tests/lib_edit_test.py index 9c82c6d..49a3f80 100644 --- a/tests/lib_edit_test.py +++ b/tests/lib_edit_test.py @@ -227,6 +227,19 @@ def test_change_nic_type(compute_driver): assert result is True +def test_edit_firewall_rule(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + rule_name = 'sdk_test_firewall_rule_2' + rules = compute_driver.ex_list_firewall_rules(net_domain[0]) + rule = [rule for rule in rules if rule.name == rule_name] + rule[0].destination.port_end = None + result = compute_driver.ex_edit_firewall_rule(rule[0]) + print(compute_driver.ex_get_firewall_rule(net_domain[0].id, rule[0].id)) + assert result is True + + def test_create_anti_affinity_rule(compute_driver): server1 = compute_driver.ex_get_node_by_id("d0425097-202f-4bba-b268-c7a73b8da129") server2 = compute_driver.ex_get_node_by_id("803e5e00-b22a-450a-8827-066ff15ec977") @@ -241,8 +254,19 @@ def test_delete_anti_affinity_rule(compute_driver): assert result is True +def test_delete_port_list(compute_driver): + portlists = compute_driver.ex_list_portlist('6aafcf08-cb0b-432c-9c64-7371265db086') + port_list_to_delete = [plist for plist in portlists if plist.name == 'sdk_test_port_list'] + result = compute_driver.ex_delete_portlist(port_list_to_delete[0]) + assert result is True +def test_delete_address_list(compute_driver): + domain_name = 'sdk_test_1' + domains = compute_driver.ex_list_network_domains(location='EU6') + net_domain = [d for d in domains if d.name == domain_name] + addresslist_to_delete = compute_driver.ex_get_ip_address_list(net_domain[0], 'sdk_test_address_list') + print(addresslist_to_delete) def test_list_locations(compute_driver): locations = compute_driver.list_locations() http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/tests/lib_list_test.py ---------------------------------------------------------------------- diff --git a/tests/lib_list_test.py b/tests/lib_list_test.py index d86b96b..9497473 100644 --- a/tests/lib_list_test.py +++ b/tests/lib_list_test.py @@ -298,6 +298,8 @@ def test_list_no_anti_affinity_rules(compute_driver): anti_affinity_rules = compute_driver.ex_list_anti_affinity_rules(node=node) assert len(anti_affinity_rules) == 0 + + """ def test_list_sizes(compute_driver): properties = compute_driver.list_locations()
