Oops, the previous one includes unrelated code. This is corrected one. >From e74f2be89e9813d623ee16130398af4caddc58c7 Mon Sep 17 00:00:00 2001 Message-Id: <e74f2be89e9813d623ee16130398af4caddc58c7.1333536317.git.yamah...@valinux.co.jp> In-Reply-To: <[email protected]> References: <[email protected]> From: Isaku Yamahata <[email protected]> Date: Fri, 23 Mar 2012 19:28:30 +0900 Subject: [PATCH 31/32] app/gre_tunnel: implement GRETunnel app
- race masking layer - debug app: PortSetDebug - implement GRETunnel app: app/gre_tunnel Signed-off-by: Isaku Yamahata <[email protected]> --- ryu/app/gre_tunnel.py | 841 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 841 insertions(+), 0 deletions(-) create mode 100644 ryu/app/gre_tunnel.py diff --git a/ryu/app/gre_tunnel.py b/ryu/app/gre_tunnel.py new file mode 100644 index 0000000..26cb138 --- /dev/null +++ b/ryu/app/gre_tunnel.py @@ -0,0 +1,841 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# Copyright (C) 2012 Isaku Yamahata <yamahata at valinux co jp> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3 of the License +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +from collections import defaultdict + +from ryu import exception as ryu_exc +from ryu.app.rest_nw_id import NW_ID_VPORT_GRE +from ryu.app.rest_nw_id import RESERVED_NETWORK_IDS +from ryu.controller import dispatcher +from ryu.controller import dpset +from ryu.controller import event +from ryu.controller import handler +from ryu.controller import network +from ryu.controller import ofp_event +from ryu.controller import tunnels +from ryu.ofproto import nx_match +from ryu.lib import mac + + +LOG = logging.getLogger(__name__) + + +# Those events are higher level events than events of network tenant, +# tunnel ports as the race conditions are masked. +# Add event is generated only when all necessary informations are gathered, +# Del event is generated when any one of the informations are deleted. +# +# Example: ports for VMs +# there is a race condition between ofp port add/del event and +# register network_id for the port. + + +class EventTunnelKeyDel(event.EventBase): + def __init__(self, tunnel_key): + super(EventTunnelKeyDel, self).__init__() + self.tunnel_key = tunnel_key + + +class EventPortBase(event.EventBase): + def __init__(self, dpid, port_no): + super(EventPortBase, self).__init__() + self.dpid = dpid + self.port_no = port_no + + +class EventVMPort(EventPortBase): + def __init__(self, network_id, tunnel_key, + dpid, port_no, mac_address, add_del): + super(EventVMPort, self).__init__(dpid, port_no) + self.network_id = network_id + self.tunnel_key = tunnel_key + self.mac_address = mac_address + self.add_del = add_del + + def __str__(self): + return ('EventVMPort<dpid %x port_no %d ' + 'network_id %s tunnel_key %s mac %s add_del %s>' % + (self.dpid, self.port_no, + self.network_id, self.tunnel_key, self.mac_address, + self.add_del)) + + +class EventTunnelPort(EventPortBase): + def __init__(self, dpid, port_no, remote_dpid, add_del): + super(EventTunnelPort, self).__init__(dpid, port_no) + self.remote_dpid = remote_dpid + self.add_del = add_del + + def __str__(self): + return ('EventTunnelPort<dpid %x port_no %d remote_dpid %x ' + 'add_del %s>' % + (self.dpid, self.port_no, self.remote_dpid, self.add_del)) + + +QUEUE_NAME_PORT_SET_EV = 'port_set_event' +DISPATCHER_NAME_PORT_SET_EV = 'port_set_event' +PORT_SET_EV_DISPATCHER = dispatcher.EventDispatcher( + DISPATCHER_NAME_PORT_SET_EV) + + +def _link_is_up(dpset_, dp, port_no): + try: + state = dpset_.get_port_state(dp.id, port_no) + return not (state & dp.ofproto.OFPPS_LINK_DOWN) + except ryu_exc.PortNotFound: + return False + + +class PortSet(object): + def __init__(self, **kwargs): + super(PortSet, self).__init__() + self.nw = kwargs['network'] + self.tunnels = kwargs['tunnels'] + self.dpset = kwargs['dpset'] + self.ev_q = dispatcher.EventQueue(QUEUE_NAME_PORT_SET_EV, + PORT_SET_EV_DISPATCHER) + + def _check_link_state(self, dp, port_no, add_del): + if add_del: + # When adding port, the link should be UP. + return _link_is_up(self.dpset, dp, port_no) + else: + # When deleting port, the link status isn't cared. + return True + + # Tunnel port + # of connecting: self.dpids by (dpid, port_no) + # datapath: connected: EventDP event + # port status: UP: port add/delete/modify event + # remote dpid: self.tunnels by (dpid, port_no): tunnel port add/del even + def _tunnel_port_handler(self, dpid, port_no, add_del): + dp = self.dpset.get(dpid) + if dp is None: + return + if not self._check_link_state(dp, port_no, add_del): + return + try: + remote_dpid = self.tunnels.get_remote_dpid(dpid, port_no) + except ryu_exc.PortNotFound: + return + + self.ev_q.queue(EventTunnelPort(dpid, port_no, remote_dpid, add_del)) + + # VM port + # of connection: self.dpids by (dpid, port_no) + # datapath: connected: EventDP event + # port status: UP: Port add/delete/modify event + # network_id: self.nw by (dpid, port_no): network port add/del event + # mac_address: self.nw by (dpid, port_no): mac address add/del event + # tunnel key: from self.tunnels by network_id: tunnel key add/del event + def _vm_port_handler(self, dpid, port_no, + network_id, mac_address, add_del): + if network_id in RESERVED_NETWORK_IDS: + return + if mac_address is None: + return + dp = self.dpset.get(dpid) + if dp is None: + return + if not self._check_link_state(dp, port_no, add_del): + return + try: + tunnel_key = self.tunnels.get_key(network_id) + except ryu_exc.TunnelKeyNotFound: + return + + self.ev_q.queue(EventVMPort(network_id, tunnel_key, dpid, + port_no, mac_address, add_del)) + + def _vm_port_mac_handler(self, dpid, port_no, network_id, add_del): + try: + mac_address = self.nw.get_mac(dpid, port_no) + except ryu_exc.PortNotFound: + return + self._vm_port_handler(dpid, port_no, network_id, mac_address, + add_del) + + def _port_handler(self, dpid, port_no, add_del): + """ + :type add_del: bool + :param add_del: True for add, False for del + """ + try: + port = self.nw.get_port(dpid, port_no) + except ryu_exc.PortNotFound: + return + + if port.network_id is None: + return + + if port.network_id == NW_ID_VPORT_GRE: + self._tunnel_port_handler(dpid, port_no, add_del) + return + + self._vm_port_handler(dpid, port_no, port.network_id, + port.mac_address, add_del) + + def _tunnel_key_del(self, tunnel_key): + self.ev_q.queue(EventTunnelKeyDel(tunnel_key)) + + # nw: network del + # port add/del (vm/tunnel port) + # mac address add/del(only vm port) + # tunnels: tunnel key add/del + # tunnel port add/del + # dpset: eventdp + # port add/delete/modify + + @handler.set_ev_cls(network.EventNetworkDel, + network.NETWORK_TENANT_EV_DISPATCHER) + def network_del_handler(self, ev): + network_id = ev.network_id + if network_id in RESERVED_NETWORK_IDS: + return + try: + tunnel_key = self.tunnels.get_key(network_id) + except ryu_exc.TunnelKeyNotFound: + return + for (dpid, port_no) in self.nw.list_ports(network_id): + self._vm_port_mac_handler(dpid, port_no, network_id, False) + self._tunnel_key_del(tunnel_key) + + @handler.set_ev_cls(network.EventNetworkPort, + network.NETWORK_TENANT_EV_DISPATCHER) + def network_port_handler(self, ev): + self._vm_port_mac_handler(ev.dpid, ev.port_no, ev.network_id, + ev.add_del) + + @handler.set_ev_cls(network.EventMacAddress, + network.NETWORK_TENANT_EV_DISPATCHER) + def network_mac_address_handler(self, ev): + self._vm_port_handler(ev.dpid, ev.port_no, ev.network_id, + ev.mac_address, ev.add_del) + + @handler.set_ev_cls(tunnels.EventTunnelKeyAdd, + tunnels.TUNNEL_EV_DISPATCHER) + def tunnel_key_add_handler(self, ev): + for (dpid, port_no) in self.nw.list_ports(ev.network_id): + self._vm_port_mac_handler(dpid, port_no, ev.network_id, True) + + @handler.set_ev_cls(tunnels.EventTunnelKeyDel, + tunnels.TUNNEL_EV_DISPATCHER) + def tunnel_key_del_handler(self, ev): + network_id = ev.network_id + for (dpid, port_no) in self.nw.list_ports(network_id): + self._vm_port_mac_handler(dpid, port_no, network_id, False) + if self.nw.has_networks(network_id): + self._tunnel_key_del(ev.tunnel_key) + + @handler.set_ev_cls(tunnels.EventTunnelPort, tunnels.TUNNEL_EV_DISPATCHER) + def tunnel_port_handler(self, ev): + self._port_handler(ev.dpid, ev.port_no, ev.add_del) + + @handler.set_ev_cls(dpset.EventDP, dpset.DPSET_EV_DISPATCHER) + def dp_handler(self, ev): + if not ev.enter_leave: + # TODO:XXX + # What to do on datapath disconnection? + LOG.debug('dp disconnection ev:%s', ev) + + dpid = ev.dp.id + for port in self.nw.get_ports(dpid): + self._port_handler(dpid, port.port_no, ev.enter_leave) + + @handler.set_ev_cls(dpset.EventPortAdd, dpset.DPSET_EV_DISPATCHER) + def port_add_handler(self, ev): + self._port_handler(ev.dp.id, ev.port.port_no, True) + + @handler.set_ev_cls(dpset.EventPortDelete, dpset.DPSET_EV_DISPATCHER) + def port_del_handler(self, ev): + self._port_handler(ev.dp.id, ev.port.port_no, False) + + @handler.set_ev_cls(dpset.EventPortModify, dpset.DPSET_EV_DISPATCHER) + def port_modify_handler(self, ev): + # We don't know LINK status has been changed. + # So VM/TUNNEL port event can be triggered many times. + dp = ev.dp + port = ev.port + self._port_handler(dp.id, port.port_no, + not (port.state & dp.ofproto.OFPPS_LINK_DOWN)) + + +class PortSetDebug(object): + """app for debug class PortSet""" + def __init__(self, *_args, **kwargs): + super(PortSetDebug, self).__init__() + self.nw = kwargs['network'] + self.dpset = kwargs['dpset'] + self.tunnels = kwargs['tunnels'] + self.port_set = PortSet(**kwargs) + handler.register_instance(self.port_set) + + @handler.set_ev_cls(EventTunnelKeyDel, PORT_SET_EV_DISPATCHER) + def tunnel_key_del_handler(self, ev): + LOG.debug('tunnel_key_del ev %s', ev) + + @handler.set_ev_cls(EventVMPort, PORT_SET_EV_DISPATCHER) + def vm_port_handler(self, ev): + LOG.debug('vm_port ev %s', ev) + + @handler.set_ev_cls(EventTunnelPort, PORT_SET_EV_DISPATCHER) + def tunnel_port_handler(self, ev): + LOG.debug('tunnel_port ev %s', ev) + + +class GRETunnel(object): + """app for L2/L3 with gre tunneling""" + + TABLE_DEFAULT_PRPIRITY = 32768 # = ofproto.OFP_DEFAULT_PRIORITY + + SRC_TABLE = 0 + TUNNEL_OUT_TABLE = 1 + LOCAL_OUT_TABLE = 2 + FLOW_TABLES = [SRC_TABLE, TUNNEL_OUT_TABLE, LOCAL_OUT_TABLE] + + SRC_PRI_MAC = TABLE_DEFAULT_PRPIRITY + SRC_PRI_DROP = TABLE_DEFAULT_PRPIRITY / 2 + SRC_PRI_TUNNEL_PASS = TABLE_DEFAULT_PRPIRITY + SRC_PRI_TUNNEL_DROP = TABLE_DEFAULT_PRPIRITY / 2 + + TUNNEL_OUT_PRI_MAC = TABLE_DEFAULT_PRPIRITY + TUNNEL_OUT_PRI_BROADCAST = TABLE_DEFAULT_PRPIRITY / 2 + TUNNEL_OUT_PRI_PASS = TABLE_DEFAULT_PRPIRITY / 4 + TUNNEL_OUT_PRI_DROP = TABLE_DEFAULT_PRPIRITY / 8 + + LOCAL_OUT_PRI_MAC = TABLE_DEFAULT_PRPIRITY + LOCAL_OUT_PRI_BROADCAST = TABLE_DEFAULT_PRPIRITY / 2 + LOCAL_OUT_PRI_DROP = TABLE_DEFAULT_PRPIRITY / 4 + + def __init__(self, *_args, **kwargs): + super(GRETunnel, self).__init__() + self.nw = kwargs['network'] + self.dpset = kwargs['dpset'] + self.tunnels = kwargs['tunnels'] + + self.port_set = PortSet(**kwargs) + handler.register_instance(self.port_set) + + # TODO: track active vm/tunnel ports + + @handler.set_ev_cls(dpset.EventDP, dpset.DPSET_EV_DISPATCHER) + def dp_handler(self, ev): + if not ev.enter_leave: + return + + # enable nicira extension + # TODO:XXX error handling + dp = ev.dp + ofproto = dp.ofproto + + dp.send_nxt_set_flow_format(ofproto.NXFF_NXM) + flow_mod_table_id = dp.ofproto_parser.NXTFlowModTableId(dp, 1) + dp.send_msg(flow_mod_table_id) + dp.send_barrier() + + # delete all flows in all tables + # current controller.handlers takes care of only table = 0 + for table in self.FLOW_TABLES: + rule = nx_match.ClsRule() + self.send_flow_del(dp, rule, table, ofproto.OFPFC_DELETE, + None, None) + dp.send_barrier() + + @staticmethod + def _make_command(table, command): + return table << 8 | command + + def send_flow_mod(self, dp, rule, table, command, priority, actions): + command = self._make_command(table, command) + dp.send_flow_mod(rule=rule, cookie=0, command=command, idle_timeout=0, + hard_timeout=0, priority=priority, actions=actions) + + def send_flow_del(self, dp, rule, table, command, priority, out_port): + command = self._make_command(table, command) + dp.send_flow_mod(rule=rule, cookie=0, command=command, idle_timeout=0, + hard_timeout=0, priority=priority, out_port=out_port) + + def _list_tunnel_port(self, dp, remote_dpids): + dpid = dp.id + tunnel_ports = [] + for other_dpid in remote_dpids: + if other_dpid == dpid: + continue + other_dp = self.dpset.get(other_dpid) + if other_dp is None: + continue + try: + port_no = self.tunnels.get_port(dpid, other_dpid) + except ryu_exc.PortNotFound: + continue + if not self._link_is_up(dp, port_no): + continue + tunnel_ports.append(port_no) + + return tunnel_ports + + def _link_is_up(self, dp, port_no): + return _link_is_up(self.dpset, dp, port_no) + + def _vm_port_add(self, ev): + dpid = ev.dpid + dp = self.dpset.get(dpid) + assert dp is not None + ofproto = dp.ofproto + ofproto_parser = dp.ofproto_parser + mac_address = ev.mac_address + network_id = ev.network_id + tunnel_key = ev.tunnel_key + remote_dpids = self.nw.get_dpids(network_id) + remote_dpids.remove(dpid) + + # LOCAL_OUT_TABLE: unicast + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac_address) + actions = [ofproto_parser.OFPActionOutput(ev.port_no)] + self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE, ofproto.OFPFC_ADD, + self.LOCAL_OUT_PRI_MAC, actions) + + # LOCAL_OUT_TABLE: broad cast + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac.BROADCAST) + + actions = [] + for port in self.nw.get_ports(dpid): + if (port.network_id != network_id or port.mac_address is None): + continue + if not self._link_is_up(dp, port.port_no): + continue + actions.append(ofproto_parser.OFPActionOutput(port.port_no)) + + first_instance = (len(actions) == 1) + assert actions + if first_instance: + command = ofproto.OFPFC_ADD + else: + command = ofproto.OFPFC_MODIFY_STRICT + self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE, command, + self.LOCAL_OUT_PRI_BROADCAST, actions) + + # LOCAL_OUT_TABLE: multicast TODO:XXX + + # LOCAL_OUT_TABLE: catch-all drop + if first_instance: + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE, + ofproto.OFPFC_ADD, self.LOCAL_OUT_PRI_DROP, []) + + # TUNNEL_OUT_TABLE: unicast + for remote_dpid in remote_dpids: + remote_dp = self.dpset.get(remote_dpid) + if remote_dp is None: + continue + try: + tunnel_port_no = self.tunnels.get_port(dpid, remote_dpid) + except ryu_exc.PortNotFound: + continue + if not self._link_is_up(dp, tunnel_port_no): + continue + + for port in self.nw.get_ports(remote_dpid): + if port.network_id != network_id or port.mac_address is None: + continue + if not self._link_is_up(remote_dp, port.port_no): + continue + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(port.mac_address) + output = ofproto_parser.OFPActionOutput(tunnel_port_no) + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions = [output, resubmit_table] + self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE, + ofproto.OFPFC_ADD, self.TUNNEL_OUT_PRI_MAC, + actions) + + if first_instance: + rule = nx_match.ClsRule() + rule.set_in_port(tunnel_port_no) + rule.set_tun_id(tunnel_key) + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions = [resubmit_table] + self.send_flow_mod(dp, rule, self.SRC_TABLE, + ofproto.OFPFC_ADD, self.SRC_PRI_TUNNEL_PASS, + actions) + + if first_instance: + # TUNNEL_OUT_TABLE: catch-all drop(resubmit to LOCAL_OUT_TABLE) + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions = [resubmit_table] + self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE, + ofproto.OFPFC_ADD, + self.TUNNEL_OUT_PRI_PASS, actions) + + # TUNNEL_OUT_TABLE: broadcast + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac.BROADCAST) + actions = [ofproto_parser.OFPActionOutput(tunnel_port_no) + for tunnel_port_no + in self._list_tunnel_port(dp, remote_dpids)] + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions.append(resubmit_table) + self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE, + ofproto.OFPFC_ADD, + self.TUNNEL_OUT_PRI_BROADCAST, actions) + + # TUNNEL_OUT_TABLE: multicast TODO:XXX + + # SRC_TABLE: + dp.send_barrier() + rule = nx_match.ClsRule() + rule.set_in_port(ev.port_no) + rule.set_dl_src(mac_address) + set_tunnel = ofproto_parser.NXActionSetTunnel(tunnel_key) + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.TUNNEL_OUT_TABLE) + actions = [set_tunnel, resubmit_table] + self.send_flow_mod(dp, rule, self.SRC_TABLE, ofproto.OFPFC_ADD, + self.SRC_PRI_MAC, actions) + + rule = nx_match.ClsRule() + rule.set_in_port(ev.port_no) + self.send_flow_mod(dp, rule, self.SRC_TABLE, ofproto.OFPFC_ADD, + self.SRC_PRI_DROP, []) + + # remote dp + for remote_dpid in remote_dpids: + remote_dp = self.dpset.get(remote_dpid) + if remote_dp is None: + continue + try: + tunnel_port_no = self.tunnels.get_port(remote_dpid, dpid) + except ryu_exc.PortNotFound: + continue + if not self._link_is_up(remote_dp, tunnel_port_no): + continue + + remote_ofproto = remote_dp.ofproto + remote_ofproto_parser = remote_dp.ofproto_parser + + # TUNNEL_OUT_TABLE: unicast + rule = nx_match.ClsRule() + rule.set_tun_id(ev.tunnel_key) + rule.set_dl_dst(mac_address) + output = remote_ofproto_parser.OFPActionOutput(tunnel_port_no) + resubmit_table = remote_ofproto_parser.NXActionResubmitTable( + in_port=remote_ofproto.OFPP_IN_PORT, + table=self.LOCAL_OUT_TABLE) + actions = [output, resubmit_table] + self.send_flow_mod(remote_dp, rule, self.TUNNEL_OUT_TABLE, + remote_ofproto.OFPFC_ADD, + self.TUNNEL_OUT_PRI_MAC, actions) + + if first_instance: + # SRC_TABLE: + rule = nx_match.ClsRule() + rule.set_in_port(tunnel_port_no) + rule.set_tun_id(ev.tunnel_key) + resubmit_table = remote_ofproto_parser.NXActionResubmitTable( + in_port=remote_ofproto.OFPP_IN_PORT, + table=self.LOCAL_OUT_TABLE) + actions = [resubmit_table] + self.send_flow_mod(remote_dp, rule, self.SRC_TABLE, + remote_ofproto.OFPFC_ADD, + self.SRC_PRI_TUNNEL_PASS, actions) + else: + continue + + # TUNNEL_OUT_TABLE: broadcast + rule = nx_match.ClsRule() + rule.set_tun_id(ev.tunnel_key) + rule.set_dl_dst(mac.BROADCAST) + tunnel_ports = self._list_tunnel_port(remote_dp, remote_dpids) + tunnel_ports.append(tunnel_port_no) + actions = [remote_ofproto_parser.OFPActionOutput(port_no) + for port_no in tunnel_ports] + if len(actions) == 1: + command = remote_dp.ofproto.OFPFC_ADD + else: + command = remote_dp.ofproto.OFPFC_MODIFY_STRICT + resubmit_table = remote_ofproto_parser.NXActionResubmitTable( + in_port=remote_ofproto.OFPP_IN_PORT, + table=self.LOCAL_OUT_TABLE) + actions.append(resubmit_table) + self.send_flow_mod(remote_dp, rule, self.TUNNEL_OUT_TABLE, + command, self.TUNNEL_OUT_PRI_BROADCAST, actions) + + # TUNNEL_PORT_TABLE: multicast TODO:XXX + + def _vm_port_del(self, ev): + # TODO:XXX + dpid = ev.dpid + dp = self.dpset.get(dpid) + assert dp is not None + ofproto = dp.ofproto + ofproto_parser = dp.ofproto_parser + mac_address = ev.mac_address + network_id = ev.network_id + tunnel_key = ev.tunnel_key + + local_ports = [] + for port in self.nw.get_ports(dpid): + if port.port_no == ev.port_no: + continue + if (port.network_id != network_id or port.mac_address is None): + continue + if not self._link_is_up(dp, port.port_no): + continue + local_ports.append(port.port_no) + + last_instance = not local_ports + + # SRC_TABLE + rule = nx_match.ClsRule() + rule.set_in_port(ev.port_no) + self.send_flow_mod(dp, rule, self.SRC_TABLE, ofproto.OFPFC_DELETE, + ofproto.OFP_DEFAULT_PRIORITY, + []) # priority is ignored + + if last_instance: + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + self.send_flow_mod(dp, rule, self.SRC_TABLE, + ofproto.OFPFC_DELETE, + self.SRC_PRI_TUNNEL_DROP, + []) # priority is ignored + + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE, + ofproto.OFPFC_DELETE, + ofproto.OFP_DEFAULT_PRIORITY, + []) # priority is ignored + + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE, + ofproto.OFPFC_DELETE, + ofproto.OFP_DEFAULT_PRIORITY, + []) # priority is ignored + else: + # LOCAL_OUT_TABLE: unicast + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_src(mac_address) + self.send_flow_del(dp, rule, self.LOCAL_OUT_TABLE, + ofproto.OFPFC_DELETE_STRICT, + self.LOCAL_OUT_PRI_MAC, ev.port_no) + + # LOCAL_OUT_TABLE: broadcast + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac.BROADCAST) + actions = [ofproto_parser.OFPActionOutput(port_no) + for port_no in local_ports] + self.send_flow_mod(dp, rule, self.LOCAL_OUT_TABLE, + ofproto.OFPFC_MODIFY_STRICT, + self.LOCAL_OUT_PRI_BROADCAST, actions) + + # LOCAL_OUT_TABLE: multicast TODO:XXX + + # remote dp + remote_dpids = self.nw.get_dpids(ev.network_id) + remote_dpids.remove(dpid) + for remote_dpid in remote_dpids: + remote_dp = self.dpset.get(remote_dpid) + if remote_dp is None: + continue + try: + tunnel_port_no = self.tunnels.get_port(remote_dpid, dpid) + except ryu_exc.PortNotFound: + continue + if not self._link_is_up(remote_dp, tunnel_port_no): + continue + + remote_ofproto = remote_dp.ofproto + remote_ofproto_parser = remote_dp.ofproto_parser + + if last_instance: + rule = nx_match.ClsRule() + rule.set_in_port(tunnel_port_no) + rule.set_tun_id(tunnel_key) + self.send_flow_del(remote_dp, rule, self.SRC_TABLE, + remote_ofproto.OFPFC_DELETE_STRICT, + remote_ofproto.OFP_DEFAULT_PRIORITY, None) + + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac.BROADCAST) + tunnel_ports = self._list_tunnel_port(remote_dp, + remote_dpids) + # broadcast + # tunnel_ports.remove(tunnel_port_no) + assert tunnel_port_no not in tunnel_ports + actions = [remote_ofproto_parser.OFPActionOutput(port_no) + for port_no in tunnel_ports] + if not actions: + command = remote_dp.ofproto.OFPFC_DELETE_STRICT + else: + command = remote_dp.ofproto.OFPFC_MODIFY_STRICT + resubmit_table = \ + remote_ofproto_parser.NXActionResubmitTable( + in_port=remote_ofproto.OFPP_IN_PORT, + table=self.LOCAL_OUT_TABLE) + actions.append(resubmit_table) + remote_dp.send_flow_mod(remote_dp, rule, self.TUNNEL_OUT_TABLE, + command, self.TUNNEL_OUT_PRI_BROADCAST, + actions) + + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac_address) + self.send_flow_del(remote_dp, rule, self.TUNNEL_OUT_TABLE, + remote_ofproto.OFPFC_DELETE_STRICT, + self.TUNNEL_OUT_PRI_MAC, tunnel_port_no) + + # TODO:XXX multicast + + def _get_vm_ports(self, dpid): + ports = defaultdict(list) + for port in self.nw.get_ports(dpid): + if port.network_id in RESERVED_NETWORK_IDS: + continue + ports[port.network_id].append(port) + return ports + + def _tunnel_port_add(self, ev): + dpid = ev.dpid + dp = self.dpset.get(dpid) + ofproto = dp.ofproto + ofproto_parser = dp.ofproto_parser + remote_dpid = ev.remote_dpid + + local_ports = self._get_vm_ports(dpid) + remote_ports = self._get_vm_ports(remote_dpid) + + # ingress flow from this tunnel port: remote -> tunnel port + # SRC_TABLE: drop if unknown tunnel_key + rule = nx_match.ClsRule() + rule.set_in_port(ev.port_no) + self.send_flow_mod(dp, rule, self.SRC_TABLE, ofproto.OFPFC_ADD, + self.SRC_PRI_TUNNEL_DROP, []) + + # SRC_TABLE: pass if known tunnel_key + for network_id in local_ports: + try: + tunnel_key = self.tunnels.get_key(network_id) + except ryu_exc.TunnelKeyNotFound: + continue + if network_id not in remote_ports: + continue + + rule = nx_match.ClsRule() + rule.set_in_port(ev.port_no) + rule.set_tun_id(tunnel_key) + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions = [resubmit_table] + self.send_flow_mod(dp, rule, self.SRC_TABLE, ofproto.OFPFC_ADD, + self.SRC_PRI_TUNNEL_PASS, actions) + + # egress flow into this tunnel port: vm port -> tunnel port -> remote + for network_id in local_ports: + try: + tunnel_key = self.tunnels.get_key(network_id) + except ryu_exc.TunnelKeyNotFound: + continue + ports = remote_ports.get(network_id) + if ports is None: + continue + + # TUNNEL_OUT_TABLE: unicast + for port in ports: + if port.mac_address is None: + continue + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(port.mac_address) + output = ofproto_parser.OFPActionOutput(ev.port_no) + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions = [output, resubmit_table] + self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE, + ofproto.OFPFC_ADD, self.TUNNEL_OUT_PRI_MAC, + actions) + + # TUNNEL_OUT_TABLE: broadcast + remote_dpids = self.nw.get_dpids(network_id) + remote_dpids.remove(dpid) + + rule = nx_match.ClsRule() + rule.set_tun_id(tunnel_key) + rule.set_dl_dst(mac.BROADCAST) + tunnel_ports = self._list_tunnel_port(dp, remote_dpids) + tunnel_ports.append(ev.port_no) + actions = [ofproto_parser.OFPActionOutput(port_no) + for port_no in tunnel_ports] + resubmit_table = ofproto_parser.NXActionResubmitTable( + in_port=ofproto.OFPP_IN_PORT, table=self.LOCAL_OUT_TABLE) + actions.append(resubmit_table) + if len(tunnel_ports) == 1: + command = ofproto.OFPFC_ADD + else: + command = ofproto.OFPFC_MODIFY_STRICT + self.send_flow_mod(dp, rule, self.TUNNEL_OUT_TABLE, + command, self.TUNNEL_OUT_PRI_BROADCAST, actions) + + # TUNNEL_OUT_TABLE: multicast TODO:XXX + + def _tunnel_port_del(self, ev): + # TODO:XXX there is no way to delete tunnel port at this moment. + LOG.debug('tunnel port deletion. %s TODO!', ev) + + @handler.set_ev_cls(EventTunnelKeyDel, PORT_SET_EV_DISPATCHER) + def tunnel_key_del_handler(self, ev): + LOG.debug('tunnel_key_del ev %s', ev) + + @handler.set_ev_cls(EventVMPort, PORT_SET_EV_DISPATCHER) + def vm_port_handler(self, ev): + LOG.debug('vm_port ev %s', ev) + if ev.add_del: + self._vm_port_add(ev) + else: + self._vm_port_del(ev) + + @handler.set_ev_cls(EventTunnelPort, PORT_SET_EV_DISPATCHER) + def tunnel_port_handler(self, ev): + LOG.debug('tunnel_port ev %s', ev) + if ev.add_del: + self._tunnel_port_add(ev) + else: + self._tunnel_port_del(ev) + + @handler.set_ev_cls(ofp_event.EventOFPPacketIn, handler.MAIN_DISPATCHER) + def packet_in_handler(self, ev): + # for debug + msg = ev.msg + LOG.debug('packet in ev %s msg %s', ev, ev.msg) + if msg.buffer_id != 0xffffffff: # TODO:XXX use constant instead of -1 + msg.datapath.send_packet_out(msg.buffer_id, msg.in_port, []) -- 1.7.1.1 -- yamahata ------------------------------------------------------------------------------ Better than sec? Nothing is better than sec when it comes to monitoring Big Data applications. Try Boundary one-second resolution app monitoring today. Free. http://p.sf.net/sfu/Boundary-dev2dev _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
