On Wed, 8 Jul 2015 14:19:30 +0800
"muzixing.com" <[email protected]> wrote:
> To whom it may concern,
> This patch is about network aware service.
Can you explain the details of this? What exactly this does?
> You can test it by running shortest_router.py: ryu-manager
> shortest_route.py --observe-links
>
> If this patch can be applied, please delete the test.py and tree.py.
>
>
> From 903e7872c2d2f47707ea3f80072f301655f1402a Mon Sep 17 00:00:00 2001
> From: None <None>
> Date: Wed, 8 Jul 2015 13:12:26 +0800
> Subject: [PATCH] add files of network aware
>
>
> ---
> ryu/app/network_aware/network_aware.py | 211 +++++++++++++++++
> ryu/app/network_aware/network_monitor.py | 259 +++++++++++++++++++++
> ryu/app/network_aware/shortest_route.py | 360
> ++++++++++++++++++++++++++++++
> ryu/app/network_aware/test.py | 20 ++
> ryu/app/network_aware/tree.py | 69 ++++++
> 5 files changed, 919 insertions(+)
> create mode 100644 ryu/app/network_aware/network_aware.py
> create mode 100644 ryu/app/network_aware/network_monitor.py
> create mode 100644 ryu/app/network_aware/shortest_route.py
> create mode 100644 ryu/app/network_aware/test.py
> create mode 100644 ryu/app/network_aware/tree.py
>
>
> diff --git a/ryu/app/network_aware/network_aware.py
> b/ryu/app/network_aware/network_aware.py
> new file mode 100644
> index 0000000..2b579e4
> --- /dev/null
> +++ b/ryu/app/network_aware/network_aware.py
> @@ -0,0 +1,211 @@
> +# conding=utf-8
> +import logging
> +import struct
> +from operator import attrgetter
> +from ryu.base import app_manager
> +from ryu.controller import ofp_event
> +from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
> +from ryu.controller.handler import CONFIG_DISPATCHER
> +from ryu.controller.handler import set_ev_cls
> +from ryu.ofproto import ofproto_v1_3
> +from ryu.lib.packet import packet
> +from ryu.lib.packet import ethernet
> +from ryu.lib.packet import ipv4
> +from ryu.lib.packet import arp
> +from ryu.lib import hub
> +
> +from ryu.topology import event, switches
> +from ryu.topology.api import get_switch, get_link
> +
> +SLEEP_PERIOD = 10
> +IS_UPDATE = True
> +
> +
> +class Network_Aware(app_manager.RyuApp):
> + OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
> + _NAME = 'network_aware'
> +
> + def __init__(self, *args, **kwargs):
> + super(Network_Aware, self).__init__(*args, **kwargs)
> + self.name = "Network_Aware"
> + self.topology_api_app = self
> + self.link_list = []
> +
> + # links :(src_dpid,dst_dpid)->(src_port,dst_port)
> + self.link_to_port = {}
> +
> + # {(sw,port) :[host1_ip,host2_ip,host3_ip,host4_ip]}
> + self.access_table = {}
> +
> + # ports
> + self.switch_port_table = {} # dpid->port_num
> +
> + # dpid->port_num (ports without link)
> + self.access_ports = {}
> +
> + # dpid->port_num(ports with contain link `s port)
> + self.interior_ports = {}
> + self.graph = {}
> +
> + self.pre_link_to_port = {}
> + self.pre_graph = {}
> + self.pre_access_table = {}
> +
> + self.monitor_thread = hub.spawn(self._monitor)
> +
> + # show topo ,and get topo again
> + def _monitor(self):
> + i = 0
> + while True:
> + self.show_topology()
> + if i == 5:
> + self.get_topology(None)
> + i = 0
> + hub.sleep(SLEEP_PERIOD)
> + i = i + 1
> +
> + @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
> + def switch_features_handler(self, ev):
> + datapath = ev.msg.datapath
> + ofproto = datapath.ofproto
> + parser = datapath.ofproto_parser
> + msg = ev.msg
> + self.logger.info("switch:%s connected", datapath.id)
> +
> + # install table-miss flow entry
> + match = parser.OFPMatch()
> + actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
> + ofproto.OFPCML_NO_BUFFER)]
> + self.add_flow(datapath, 0, match, actions)
> +
> + def add_flow(self, dp, p, match, actions, idle_timeout=0,
> hard_timeout=0):
> + ofproto = dp.ofproto
> + parser = dp.ofproto_parser
> +
> + inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
> + actions)]
> +
> + mod = parser.OFPFlowMod(datapath=dp, priority=p,
> + idle_timeout=idle_timeout,
> + hard_timeout=hard_timeout,
> + match=match, instructions=inst)
> + dp.send_msg(mod)
> +
> + def get_switches(self):
> + return self.switches
> +
> + def get_links(self):
> + return self.link_to_port
> +
> + # get Adjacency matrix from link_to_port
> + def get_graph(self, link_list):
> + for src in self.switches:
> + for dst in self.switches:
> + self.graph.setdefault(src, {dst: float('inf')})
> + if src == dst:
> + self.graph[src][src] = 0
> + elif (src, dst) in link_list:
> + self.graph[src][dst] = 1
> + else:
> + self.graph[src][dst] = float('inf')
> + return self.graph
> +
> + for sw in switch_list:
> + dpid = sw.dp.id
> + self.switch_port_table.setdefault(dpid, set())
> + self.interior_ports.setdefault(dpid, set())
> + self.access_ports.setdefault(dpid, set())
> +
> + for p in sw.ports:
> + self.switch_port_table[dpid].add(p.port_no)
> +
> + def create_port_map(self, switch_list):
> + for sw in switch_list:
> + dpid = sw.dp.id
> + self.switch_port_table.setdefault(dpid, set())
> + self.interior_ports.setdefault(dpid, set())
> + self.access_ports.setdefault(dpid, set())
> +
> + for p in sw.ports:
> + self.switch_port_table[dpid].add(p.port_no)
> +
> + # get links`srouce port to dst port from link_list,
> + # link_to_port:(src_dpid,dst_dpid)->(src_port,dst_port)
> + def create_interior_links(self, link_list):
> + for link in link_list:
> + src = link.src
> + dst = link.dst
> + self.link_to_port[
> + (src.dpid, dst.dpid)] = (src.port_no, dst.port_no)
> +
> + # find the access ports and interiorior ports
> + if link.src.dpid in self.switches:
> + self.interior_ports[link.src.dpid].add(link.src.port_no)
> + if link.dst.dpid in self.switches:
> + self.interior_ports[link.dst.dpid].add(link.dst.port_no)
> +
> + # get ports without link into access_ports
> + def create_access_ports(self):
> + for sw in self.switch_port_table:
> + self.access_ports[sw] = self.switch_port_table[
> + sw] - self.interior_ports[sw]
> +
> + events = [event.EventSwitchEnter,
> + event.EventSwitchLeave, event.EventPortAdd,
> + event.EventPortDelete, event.EventPortModify,
> + event.EventLinkAdd, event.EventLinkDelete]
> +
> + @set_ev_cls(events)
> + def get_topology(self, ev):
> + switch_list = get_switch(self.topology_api_app, None)
> + self.create_port_map(switch_list)
> + self.switches = self.switch_port_table.keys()
> + links = get_link(self.topology_api_app, None)
> + self.create_interior_links(links)
> + self.create_access_ports()
> + self.get_graph(self.link_to_port.keys())
> + # self.show_topology()
> +
> + # show topo
> + def show_topology(self):
> + switch_num = len(self.graph)
> + if self.pre_graph != self.graph or IS_UPDATE:
> + print "---------------------Topo Link---------------------"
> + print '%10s' % ("switch"),
> + for i in xrange(1, switch_num + 1):
> + print '%10d' % i,
> + print ""
> + for i in self.graph.keys():
> + print '%10d' % i,
> + for j in self.graph[i].values():
> + print '%10.0f' % j,
> + print ""
> + self.pre_graph = self.graph
> + # show link
> + if self.pre_link_to_port != self.link_to_port or IS_UPDATE:
> + print "---------------------Link Port---------------------"
> + print '%10s' % ("switch"),
> + for i in xrange(1, switch_num + 1):
> + print '%10d' % i,
> + print ""
> + for i in xrange(1, switch_num + 1):
> + print '%10d' % i,
> + for j in xrange(1, switch_num + 1):
> + if (i, j) in self.link_to_port.keys():
> + print '%10s' % str(self.link_to_port[(i, j)]),
> + else:
> + print '%10s' % "No-link",
> + print ""
> + self.pre_link_to_port = self.link_to_port
> +
> + # each dp access host
> + # {(sw,port) :[host1_ip,host2_ip,host3_ip,host4_ip]}
> + if self.pre_access_table != self.access_table or IS_UPDATE:
> + print "----------------Access Host-------------------"
> + print '%10s' % ("switch"), '%12s' % "Host"
> + if not self.access_table.keys():
> + print " NO found host"
> + else:
> + for tup in self.access_table:
> + print '%10d: ' % tup[0], self.access_table[tup]
> + self.pre_access_table = self.access_table
> diff --git a/ryu/app/network_aware/network_monitor.py
> b/ryu/app/network_aware/network_monitor.py
> new file mode 100644
> index 0000000..f6b5cba
> --- /dev/null
> +++ b/ryu/app/network_aware/network_monitor.py
> @@ -0,0 +1,259 @@
> +from __future__ import division
> +from operator import attrgetter
> +from ryu.base import app_manager
> +from ryu.controller import ofp_event
> +from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
> +from ryu.controller.handler import CONFIG_DISPATCHER
> +from ryu.controller.handler import set_ev_cls
> +from ryu.ofproto import ofproto_v1_3
> +from ryu.lib import hub
> +from ryu.lib.packet import packet
> +
> +SLEEP_PERIOD = 10
> +
> +
> +class network_monitor(app_manager.RyuApp):
> + OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
> + _NAME = 'network_monitor'
> +
> + def __init__(self, *args, **kwargs):
> + super(network_monitor, self).__init__(*args, **kwargs)
> +
> + self.datapaths = {}
> + self.port_stats = {}
> + self.port_speed = {}
> + self.flow_stats = {}
> + self.flow_speed = {}
> + # {"port":{dpid:{port:body,..},..},"flow":{dpid:body,..}
> + self.stats = {}
> + self.port_link = {} # {dpid:{port_no:(config,state,cur),..},..}
> + self.monitor_thread = hub.spawn(self._monitor)
> +
> + @set_ev_cls(ofp_event.EventOFPStateChange,
> + [MAIN_DISPATCHER, DEAD_DISPATCHER])
> + def _state_change_handler(self, ev):
> + datapath = ev.datapath
> + if ev.state == MAIN_DISPATCHER:
> + if not datapath.id in self.datapaths:
> + self.logger.debug('register datapath: %016x', datapath.id)
> + self.datapaths[datapath.id] = datapath
> + elif ev.state == DEAD_DISPATCHER:
> + if datapath.id in self.datapaths:
> + self.logger.debug('unregister datapath: %016x', datapath.id)
> + del self.datapaths[datapath.id]
> +
> + def _monitor(self):
> + while True:
> + self.stats['flow'] = {}
> + self.stats['port'] = {}
> + for dp in self.datapaths.values():
> + self.port_link.setdefault(dp.id, {})
> + self._request_stats(dp)
> + hub.sleep(SLEEP_PERIOD)
> + if self.stats['flow'] or self.stats['port']:
> + self.show_stat('flow', self.stats['flow'])
> + self.show_stat('port', self.stats['port'])
> + hub.sleep(1)
> +
> + def _request_stats(self, datapath):
> + self.logger.debug('send stats request: %016x', datapath.id)
> + ofproto = datapath.ofproto
> + parser = datapath.ofproto_parser
> +
> + req = parser.OFPFlowStatsRequest(datapath)
> + datapath.send_msg(req)
> +
> + req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
> + datapath.send_msg(req)
> +
> + req = parser.OFPPortDescStatsRequest(datapath, 0)
> + datapath.send_msg(req)
> +
> + def _save_stats(self, dist, key, value, length):
> + if key not in dist:
> + dist[key] = []
> + dist[key].append(value)
> +
> + if len(dist[key]) > length:
> + dist[key].pop(0)
> +
> + def _get_speed(self, now, pre, period):
> + if period:
> + return (now - pre) / (period)
> + else:
> + return 0
> +
> + def _get_time(self, sec, nsec):
> + return sec + nsec / (10 ** 9)
> +
> + def _get_period(self, n_sec, n_nsec, p_sec, p_nsec):
> + return self._get_time(n_sec, n_nsec) - self._get_time(p_sec, p_nsec)
> +
> + def show_stat(self, type, bodys):
> + '''
> + type: 'port' 'flow'
> + bodys: port or flow `s information :{dpid:body}
> + '''
> + if(type == 'flow'):
> +
> + print('datapath '' in-port ip-dst '
> + 'out-port packets bytes flow-speed(B/s)')
> + print('---------------- '' -------- ----------------- '
> + '-------- -------- -------- -----------')
> + for dpid in bodys.keys():
> + for stat in sorted([flow for flow in bodys[dpid]
> + if flow.priority == 1],
> + key=lambda flow: (flow.match['in_port'],
> +
> flow.match['ipv4_dst'])):
> + print('%016x %8x %17s %8x %8d %8d %8.1f' % (
> + dpid,
> + stat.match['in_port'], stat.match['ipv4_dst'],
> + stat.instructions[0].actions[0].port,
> + stat.packet_count, stat.byte_count,
> + abs(self.flow_speed[
> + (stat.match['in_port'],
> + stat.match['ipv4_dst'],
> + stat.instructions[0].actions[0].port)][-1])))
> + print '\n'
> +
> + if(type == 'port'):
> + print('datapath port ''rx-pkts rx-bytes rx-error '
> + 'tx-pkts tx-bytes tx-error port-speed(B/s)'
> + ' current-capacity(Kbps) '
> + 'port-stat link-stat')
> + print('---------------- -------- ''-------- -------- -------- '
> + '-------- -------- -------- '
> + '---------------- ---------------- '
> + ' ----------- -----------')
> + format = '%016x %8x %8d %8d %8d %8d %8d %8d %8.1f %16d %16s %16s'
> + for dpid in bodys.keys():
> + for stat in sorted(bodys[dpid], key=attrgetter('port_no')):
> + if stat.port_no != ofproto_v1_3.OFPP_LOCAL:
> + print(format % (
> + dpid, stat.port_no,
> + stat.rx_packets, stat.rx_bytes, stat.rx_errors,
> + stat.tx_packets, stat.tx_bytes, stat.tx_errors,
> + abs(self.port_speed[(dpid, stat.port_no)][-1]),
> + self.port_link[dpid][stat.port_no][2],
> + self.port_link[dpid][stat.port_no][0],
> + self.port_link[dpid][stat.port_no][1]))
> + print '\n'
> +
> + @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
> + def _flow_stats_reply_handler(self, ev):
> + body = ev.msg.body
> + self.stats['flow'][ev.msg.datapath.id] = body
> + for stat in sorted([flow for flow in body if flow.priority == 1],
> + key=lambda flow: (flow.match['in_port'],
> + flow.match['ipv4_dst'])):
> + key = (
> + stat.match['in_port'], stat.match['ipv4_dst'],
> + stat.instructions[0].actions[0].port)
> + value = (
> + stat.packet_count, stat.byte_count,
> + stat.duration_sec, stat.duration_nsec)
> + self._save_stats(self.flow_stats, key, value, 5)
> +
> + # Get flow's speed.
> + pre = 0
> + period = SLEEP_PERIOD
> + tmp = self.flow_stats[key]
> + if len(tmp) > 1:
> + pre = tmp[-2][1]
> + period = self._get_period(
> + tmp[-1][2], tmp[-1][3],
> + tmp[-2][2], tmp[-2][3])
> +
> + speed = self._get_speed(
> + self.flow_stats[key][-1][1], pre, period)
> +
> + self._save_stats(self.flow_speed, key, speed, 5)
> +
> + @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
> + def _port_stats_reply_handler(self, ev):
> + body = ev.msg.body
> + self.stats['port'][ev.msg.datapath.id] = body
> + for stat in sorted(body, key=attrgetter('port_no')):
> + if stat.port_no != ofproto_v1_3.OFPP_LOCAL:
> + key = (ev.msg.datapath.id, stat.port_no)
> + value = (
> + stat.tx_bytes, stat.rx_bytes, stat.rx_errors,
> + stat.duration_sec, stat.duration_nsec)
> +
> + self._save_stats(self.port_stats, key, value, 5)
> +
> + # Get port speed.
> + pre = 0
> + period = SLEEP_PERIOD
> + tmp = self.port_stats[key]
> + if len(tmp) > 1:
> + pre = tmp[-2][0] + tmp[-2][1]
> + period = self._get_period(
> + tmp[-1][3], tmp[-1][4],
> + tmp[-2][3], tmp[-2][4])
> +
> + speed = self._get_speed(
> + self.port_stats[key][-1][0] +
> self.port_stats[key][-1][1],
> + pre, period)
> +
> + self._save_stats(self.port_speed, key, speed, 5)
> +
> + @set_ev_cls(ofp_event.EventOFPPortDescStatsReply, MAIN_DISPATCHER)
> + def port_desc_stats_reply_handler(self, ev):
> + msg = ev.msg
> + dpid = msg.datapath.id
> + ofproto = msg.datapath.ofproto
> +
> + config_dist = {ofproto.OFPPC_PORT_DOWN: "Down",
> + ofproto.OFPPC_NO_RECV: "No Recv",
> + ofproto.OFPPC_NO_FWD: "No Farward",
> + ofproto.OFPPC_NO_PACKET_IN: "No Packet-in"}
> +
> + state_dist = {ofproto.OFPPS_LINK_DOWN: "Down",
> + ofproto.OFPPS_BLOCKED: "Blocked",
> + ofproto.OFPPS_LIVE: "Live"}
> +
> + ports = []
> + for p in ev.msg.body:
> + ports.append('port_no=%d hw_addr=%s name=%s config=0x%08x '
> + 'state=0x%08x curr=0x%08x advertised=0x%08x '
> + 'supported=0x%08x peer=0x%08x curr_speed=%d '
> + 'max_speed=%d' %
> + (p.port_no, p.hw_addr,
> + p.name, p.config,
> + p.state, p.curr, p.advertised,
> + p.supported, p.peer, p.curr_speed,
> + p.max_speed))
> +
> + if p.config in config_dist:
> + config = config_dist[p.config]
> + else:
> + config = "up"
> +
> + if p.state in state_dist:
> + state = state_dist[p.state]
> + else:
> + state = "up"
> +
> + port_feature = (config, state, p.curr_speed)
> + self.port_link[dpid][p.port_no] = port_feature
> +
> + #self.logger.debug('OFPPortDescStatsReply received: %s', ports)
> +
> + @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
> + def _port_status_handler(self, ev):
> + msg = ev.msg
> + reason = msg.reason
> + port_no = msg.desc.port_no
> + dpid = msg.datapath.id
> + ofproto = msg.datapath.ofproto
> +
> + reason_dict = {ofproto.OFPPR_ADD: "added",
> + ofproto.OFPPR_DELETE: "deleted",
> + ofproto.OFPPR_MODIFY: "modified", }
> +
> + if reason in reason_dict:
> +
> + print "switch%d: port %s %s" % (dpid, reason_dict[reason],
> port_no)
> + else:
> + print "switch%d: Illeagal port state %s %s" % (port_no, reason)
> diff --git a/ryu/app/network_aware/shortest_route.py
> b/ryu/app/network_aware/shortest_route.py
> new file mode 100644
> index 0000000..f692cf0
> --- /dev/null
> +++ b/ryu/app/network_aware/shortest_route.py
> @@ -0,0 +1,360 @@
> +# conding=utf-8
> +import logging
> +import struct
> +from operator import attrgetter
> +from ryu.base import app_manager
> +from ryu.controller import ofp_event
> +from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
> +from ryu.controller.handler import CONFIG_DISPATCHER
> +from ryu.controller.handler import set_ev_cls
> +from ryu.ofproto import ofproto_v1_3
> +from ryu.lib.packet import packet
> +from ryu.lib.packet import ethernet
> +from ryu.lib.packet import ipv4
> +from ryu.lib.packet import arp
> +
> +from ryu.topology import event, switches
> +from ryu.topology.api import get_switch, get_link
> +
> +import network_aware
> +import network_monitor
> +
> +
> +class Shortest_Route(app_manager.RyuApp):
> + OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
> + _CONTEXTS = {
> + "Network_Aware": network_aware.Network_Aware,
> + "monitor": network_monitor.network_monitor,
> + }
> +
> + def __init__(self, *args, **kwargs):
> + super(Shortest_Route, self).__init__(*args, **kwargs)
> + self.network_aware = kwargs["Network_Aware"]
> + self.network_monitor = kwargs["monitor"]
> + self.mac_to_port = {}
> + self.datapaths = {}
> +
> + # links :(src_dpid,dst_dpid)->(src_port,dst_port)
> + self.link_to_port = self.network_aware.link_to_port
> +
> + # {sw :[host1_ip,host2_ip,host3_ip,host4_ip]}
> + self.access_table = self.network_aware.access_table
> +
> + # dpid->port_num (ports without link)
> + self.access_ports = self.network_aware.access_ports
> + self.graph = self.network_aware.graph
> +
> + @set_ev_cls(ofp_event.EventOFPStateChange,
> + [MAIN_DISPATCHER, DEAD_DISPATCHER])
> + def _state_change_handler(self, ev):
> + datapath = ev.datapath
> + if ev.state == MAIN_DISPATCHER:
> + if not datapath.id in self.datapaths:
> + self.logger.debug('register datapath: %016x', datapath.id)
> + self.datapaths[datapath.id] = datapath
> + elif ev.state == DEAD_DISPATCHER:
> + if datapath.id in self.datapaths:
> + self.logger.debug('unregister datapath: %016x', datapath.id)
> + del self.datapaths[datapath.id]
> +
> + def add_flow(self, dp, p, match, actions, idle_timeout=0,
> hard_timeout=0):
> + ofproto = dp.ofproto
> + parser = dp.ofproto_parser
> +
> + inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
> + actions)]
> +
> + mod = parser.OFPFlowMod(datapath=dp, priority=p,
> + idle_timeout=idle_timeout,
> + hard_timeout=hard_timeout,
> + match=match, instructions=inst)
> + dp.send_msg(mod)
> +
> + def install_flow(self, path, flow_info, buffer_id, data):
> + '''
> + path=[dpid1, dpid2, dpid3...]
> + flow_info=(eth_type, src_ip, dst_ip, in_port)
> + '''
> + # first flow entry
> + in_port = flow_info[3]
> + assert path
> + datapath_first = self.datapaths[path[0]]
> + ofproto = datapath_first.ofproto
> + parser = datapath_first.ofproto_parser
> + out_port = ofproto.OFPP_LOCAL
> +
> + # inter_link
> + if len(path) > 2:
> + for i in xrange(1, len(path) - 1):
> + port = self.get_link2port(path[i - 1], path[i])
> + port_next = self.get_link2port(path[i], path[i + 1])
> + if port:
> + src_port, dst_port = port[1], port_next[0]
> + datapath = self.datapaths[path[i]]
> + ofproto = datapath.ofproto
> + parser = datapath.ofproto_parser
> + actions = []
> +
> + actions.append(parser.OFPActionOutput(dst_port))
> + match = parser.OFPMatch(
> + in_port=src_port,
> + eth_type=flow_info[0],
> + ipv4_src=flow_info[1],
> + ipv4_dst=flow_info[2])
> + self.add_flow(
> + datapath, 1, match, actions,
> + idle_timeout=10, hard_timeout=30)
> +
> + # inter links pkt_out
> + msg_data = None
> + if buffer_id == ofproto.OFP_NO_BUFFER:
> + msg_data = data
> +
> + out = parser.OFPPacketOut(
> + datapath=datapath, buffer_id=buffer_id,
> + data=msg_data, in_port=src_port, actions=actions)
> +
> + datapath.send_msg(out)
> +
> + if len(path) > 1:
> + # the first flow entry
> + port_pair = self.get_link2port(path[0], path[1])
> + out_port = port_pair[0]
> +
> + actions = []
> + actions.append(parser.OFPActionOutput(out_port))
> + match = parser.OFPMatch(
> + in_port=in_port,
> + eth_type=flow_info[0],
> + ipv4_src=flow_info[1],
> + ipv4_dst=flow_info[2])
> + self.add_flow(datapath_first,
> + 1, match, actions, idle_timeout=10,
> hard_timeout=30)
> +
> + # the last hop: tor -> host
> + datapath = self.datapaths[path[-1]]
> + ofproto = datapath.ofproto
> + parser = datapath.ofproto_parser
> + actions = []
> + src_port = self.get_link2port(path[-2], path[-1])[1]
> + dst_port = None
> +
> + for key in self.access_table.keys():
> + if flow_info[2] == self.access_table[key]:
> + dst_port = key[1]
> + break
> + actions.append(parser.OFPActionOutput(dst_port))
> + match = parser.OFPMatch(
> + in_port=src_port,
> + eth_type=flow_info[0],
> + ipv4_src=flow_info[1],
> + ipv4_dst=flow_info[2])
> +
> + self.add_flow(
> + datapath, 1, match, actions, idle_timeout=10,
> hard_timeout=30)
> +
> + # first pkt_out
> + actions = []
> +
> + actions.append(parser.OFPActionOutput(out_port))
> + msg_data = None
> + if buffer_id == ofproto.OFP_NO_BUFFER:
> + msg_data = data
> +
> + out = parser.OFPPacketOut(
> + datapath=datapath_first, buffer_id=buffer_id,
> + data=msg_data, in_port=in_port, actions=actions)
> +
> + datapath_first.send_msg(out)
> +
> + # last pkt_out
> + actions = []
> + actions.append(parser.OFPActionOutput(dst_port))
> + msg_data = None
> + if buffer_id == ofproto.OFP_NO_BUFFER:
> + msg_data = data
> +
> + out = parser.OFPPacketOut(
> + datapath=datapath, buffer_id=buffer_id,
> + data=msg_data, in_port=src_port, actions=actions)
> +
> + datapath.send_msg(out)
> +
> + else: # src and dst on the same
> + out_port = None
> + actions = []
> + for key in self.access_table.keys():
> + if flow_info[2] == self.access_table[key]:
> + out_port = key[1]
> + break
> +
> + actions.append(parser.OFPActionOutput(out_port))
> + match = parser.OFPMatch(
> + in_port=in_port,
> + eth_type=flow_info[0],
> + ipv4_src=flow_info[1],
> + ipv4_dst=flow_info[2])
> + self.add_flow(
> + datapath_first, 1, match, actions,
> + idle_timeout=10, hard_timeout=30)
> +
> + # pkt_out
> + msg_data = None
> + if buffer_id == ofproto.OFP_NO_BUFFER:
> + msg_data = data
> +
> + out = parser.OFPPacketOut(
> + datapath=datapath_first, buffer_id=buffer_id,
> + data=msg_data, in_port=in_port, actions=actions)
> +
> + datapath_first.send_msg(out)
> +
> + def get_host_location(self, host_ip):
> + for key in self.access_table:
> + if self.access_table[key] == host_ip:
> + return key
> + self.logger.debug("%s location is not found." % host_ip)
> + return None
> +
> + def get_path(self, graph, src):
> + result = self.dijkstra(graph, src)
> + if result:
> + path = result[1]
> + return path
> + self.logger.debug("Path is not found.")
> + return None
> +
> + def get_link2port(self, src_dpid, dst_dpid):
> + if (src_dpid, dst_dpid) in self.link_to_port:
> + return self.link_to_port[(src_dpid, dst_dpid)]
> + else:
> + self.logger.debug("Link to port is not found.")
> + return None
> +
> + def dijkstra(self, graph, src):
> + if graph is None:
> + self.logger.debug("Graph is empty.")
> + return None
> + length = len(graph)
> + type_ = type(graph)
> +
> + # Initiation
> + if type_ == list:
> + nodes = [i for i in xrange(length)]
> + elif type_ == dict:
> + nodes = graph.keys()
> + visited = [src]
> + path = {src: {src: []}}
> + if src not in nodes:
> + self.logger.debug("Src is not in nodes.")
> + return None
> + else:
> + nodes.remove(src)
> + distance_graph = {src: 0}
> + pre = next = src
> + no_link_value = 100000
> +
> + while nodes:
> + distance = no_link_value
> + for v in visited:
> + for d in nodes:
> + new_dist = graph[src][v] + graph[v][d]
> + if new_dist <= distance:
> + distance = new_dist
> + next = d
> + pre = v
> + graph[src][d] = new_dist
> +
> + if distance < no_link_value:
> + path[src][next] = [i for i in path[src][pre]]
> + path[src][next].append(next)
> + distance_graph[next] = distance
> + visited.append(next)
> + nodes.remove(next)
> + else:
> + self.logger.debug("Next node is not found.")
> + return None
> +
> + return distance_graph, path
> +
> + '''
> + In packet_in handler, we need to learn access_table by ARP.
> + Therefore, the first packet from UNKOWN host MUST be ARP.
> + '''
> +
> + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
> + def _packet_in_handler(self, ev):
> + msg = ev.msg
> + datapath = msg.datapath
> + ofproto = datapath.ofproto
> + parser = datapath.ofproto_parser
> + in_port = msg.match['in_port']
> + pkt = packet.Packet(msg.data)
> +
> + eth_type = pkt.get_protocols(ethernet.ethernet)[0].ethertype
> + arp_pkt = pkt.get_protocol(arp.arp)
> + ip_pkt = pkt.get_protocol(ipv4.ipv4)
> +
> + if arp_pkt:
> + arp_src_ip = arp_pkt.src_ip
> + arp_dst_ip = arp_pkt.dst_ip
> +
> + # record the access info
> + if in_port in self.access_ports[datapath.id]:
> + self.access_table[(datapath.id, in_port)] = arp_src_ip
> +
> + result = self.get_host_location(arp_dst_ip)
> + if result: # host record in access table.
> + datapath_dst, out_port = result[0], result[1]
> + actions = [parser.OFPActionOutput(out_port)]
> + datapath = self.datapaths[datapath_dst]
> +
> + out = parser.OFPPacketOut(
> + datapath=datapath,
> + buffer_id=ofproto.OFP_NO_BUFFER,
> + in_port=ofproto.OFPP_CONTROLLER,
> + actions=actions, data=msg.data)
> + datapath.send_msg(out)
> + else: # access info is not existed. send to all host.
> + for dpid in self.access_ports:
> + for port in self.access_ports[dpid]:
> + if (dpid, port) not in self.access_table.keys():
> + actions = [parser.OFPActionOutput(port)]
> + datapath = self.datapaths[dpid]
> + out = parser.OFPPacketOut(
> + datapath=datapath,
> + buffer_id=ofproto.OFP_NO_BUFFER,
> + in_port=ofproto.OFPP_CONTROLLER,
> + actions=actions, data=msg.data)
> + datapath.send_msg(out)
> +
> + if isinstance(ip_pkt, ipv4.ipv4):
> +
> + ip_src = ip_pkt.src
> + ip_dst = ip_pkt.dst
> +
> + result = None
> + src_sw = None
> + dst_sw = None
> +
> + src_location = self.get_host_location(ip_src)
> + dst_location = self.get_host_location(ip_dst)
> +
> + if src_location:
> + src_sw = src_location[0]
> +
> + if dst_location:
> + dst_sw = dst_location[0]
> + result = self.dijkstra(self.graph, src_sw)
> +
> + if result:
> + path = result[1][src_sw][dst_sw]
> + path.insert(0, src_sw)
> + self.logger.info(
> + " PATH[%s --> %s]:%s\n" % (ip_src, ip_dst, path))
> +
> + flow_info = (eth_type, ip_src, ip_dst, in_port)
> + self.install_flow(path, flow_info, msg.buffer_id, msg.data)
> + else:
> + # Reflesh the topology database.
> + self.network_aware.get_topology(None)
> diff --git a/ryu/app/network_aware/test.py b/ryu/app/network_aware/test.py
> new file mode 100644
> index 0000000..b31b11d
> --- /dev/null
> +++ b/ryu/app/network_aware/test.py
> @@ -0,0 +1,20 @@
> +import time
> +
> +
> +def _monitor():
> + i = 0
> + while True:
> + print "show topo"
> + if i == 5:
> + print "get_topo"
> + i = 0
> + time.sleep(5)
> + i = i + 1
> +
> +def monitor():
> + while True:
> + print "show topo"
> + print "get_topo"
> + time.sleep(5)
> +
> +monitor()
> diff --git a/ryu/app/network_aware/tree.py b/ryu/app/network_aware/tree.py
> new file mode 100644
> index 0000000..3d5d287
> --- /dev/null
> +++ b/ryu/app/network_aware/tree.py
> @@ -0,0 +1,69 @@
> +from mininet.net import Mininet
> +from mininet.node import Controller, RemoteController
> +from mininet.cli import CLI
> +from mininet.log import setLogLevel, info
> +from mininet.link import Link, Intf, TCLink
> +from mininet.topo import Topo
> +import logging
> +import os
> +
> +class CustomTopo(Topo):
> + "Simple Data Center Topology"
> +
> + "linkopts - (1:core, 2:aggregation, 3: edge) parameters"
> + "fanout - number of child switch per parent switch"
> + def __init__(self,**opts):
> + Topo.__init__(self, **opts)
> + s=[]
> +
> + s.append(self.addSwitch('s1'))
> + s.append(self.addSwitch('s2'))
> + s.append(self.addSwitch('s3'))
> + s.append(self.addSwitch('s4'))
> + s.append(self.addSwitch('s5'))
> + s.append(self.addSwitch('s6'))
> + self.addLink(s[0],s[1],bw=10)
> + self.addLink(s[1],s[2],bw=10)
> + self.addLink(s[2],s[0],bw=10)
> + self.addLink(s[0],s[3],bw=5)
> + self.addLink(s[1],s[4],bw=5)
> + self.addLink(s[2],s[5],bw=5)
> +
> + h1=self.addHost('h1',mac='00:00:00:00:00:01')
> + h2=self.addHost('h2',mac='00:00:00:00:00:02')
> + h3=self.addHost('h3',mac='00:00:00:00:00:03')
> + h4=self.addHost('h4',mac='00:00:00:00:00:04')
> + h5=self.addHost('h5',mac='00:00:00:00:00:05')
> + h6=self.addHost('h6',mac='00:00:00:00:00:06')
> +
> + self.addLink(s[3],h1,bw=1)
> + self.addLink(s[3],h2,bw=1)
> + self.addLink(s[4],h3,bw=1)
> + self.addLink(s[4],h4,bw=1)
> + self.addLink(s[5],h5,bw=1)
> + self.addLink(s[5],h6,bw=1)
> +
> +topos = {'custom': (lambda: CustomTopo())}
> +
> +def createTopo():
> + logging.debug("Create Topo")
> + topo = CustomTopo()
> +
> + logging.debug("Start Mininet")
> + CONTROLLER_IP = "127.0.0.1"
> + CONTROLLER_PORT = 6633
> + net = Mininet(topo=topo, link=TCLink, controller=None)
> + net.addController(
> + 'controller', controller=RemoteController,
> + ip=CONTROLLER_IP, port=CONTROLLER_PORT)
> +
> + net.start()
> + CLI(net)
> + net.stop()
> +
> +if __name__ == '__main__':
> + setLogLevel('info')
> + if os.getuid() != 0:
> + logger.debug("You are NOT root")
> + elif os.getuid() == 0:
> + createTopo()
> --
> 1.7.9.5
>
>
>
>
>
> ------------------
> Distance 川格
> _____________________________________________________
> School of Information and Communication Engineering
> Beijing University of Posts and Telecommunications
> Beijing 100876, PR China
>
> 臼奨喨窮寄僥,佚連嚥宥佚垢殻僥垪,宥佚垢殻
> _____________________________________________________
> Mobile Phone:
> (+86) 151-1698-3550 Beijing
> E-mail:
> [email protected]
> [email protected]
> Homepage:
> http://www.muzixing.com
------------------------------------------------------------------------------
Don't Limit Your Business. Reach for the Cloud.
GigeNET's Cloud Solutions provide you with the tools and support that
you need to offload your IT needs and focus on growing your business.
Configured For All Businesses. Start Your Cloud Today.
https://www.gigenetcloud.com/
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel