Hello everyone, I am new to python and trying to run an example code from mininet tests. Basically, I am trying to call a method in Hcontroller.py from base class Routing defined in DCRouting.py which runs and fetches all the required results in install_reactive_path() method, but it returns None when it is called from _GlobalFirstFit. I hope someone here could help me fix this bug..
I am attaching all the three files(DCRouting.py, HController.py, util.py) to have a look into. Thanks in advance for your time, help or suggestion. Thanks a lot! kind regards, David
''' Simple hashed-based routing @author: Milad Sharif (msha...@stanford.edu) ''' import logging from copy import copy class Routing(object): '''Base class for data center network routing. Routing engines must implement the get_route() method. ''' def __init__(self, topo): '''Create Routing object. @param topo Topo object from Net parent ''' self.topo = topo def get_route(self, src, dst, hash_): '''Return flow path. @param src source host @param dst destination host @param hash_ hash value @return flow_path list of DPIDs to traverse (including hosts) ''' raise NotImplementedError def routes(self, src, dst): ''' Return list of paths Only works for Fat-Tree topology @ param src source host @ param dst destination host @ return list of DPIDs (including inputs) ''' complete_paths = [] # List of complete dpid routes src_paths = { src : [[src]] } dst_paths = { dst : [[dst]] } dst_layer = self.topo.layer(dst) src_layer = self.topo.layer(src) lower_layer = src_layer if dst_layer > src_layer: lower_layer = dst_layer for front_layer in range(lower_layer-1, -1, -1): if src_layer > front_layer: # expand src frontier new_src_paths = {} for node in sorted(src_paths): path_list = src_paths[node] for path in path_list: last_node = path[-1] for frontier_node in self.topo.upper_nodes(last_node): new_src_paths[frontier_node] = [path + [frontier_node]] if frontier_node in dst_paths: dst_path_list = dst_paths[frontier_node] for dst_path in dst_path_list: dst_path_copy = copy ( dst_path ) dst_path_copy.reverse() complete_paths.append( path + dst_path_copy) src_paths = new_src_paths if dst_layer > front_layer: # expand dst frontier new_dst_paths = {} for node in sorted(dst_paths): path_list = dst_paths[node] for path in path_list: last_node = path[-1] for frontier_node in self.topo.upper_nodes(last_node): new_dst_paths[frontier_node] = [ path + [frontier_node]] if frontier_node in src_paths: src_path_list = src_paths[frontier_node] dst_path_copy = copy( path ) dst_path_copy.reverse() for src_path in src_path_list: complete_paths.append( src_path + dst_path_copy) dst_paths = new_dst_paths if complete_paths: return complete_paths class HashedRouting(Routing): ''' Hashed routing ''' def __init__(self, topo): self.topo = topo def get_route(self, src, dst, hash_): ''' Return flow path. ''' if src == dst: return [src] paths = self.routes(src,dst) if paths: #print 'hash_:', hash_ choice = hash_ % len(paths) #print 'choice:', choice path = sorted(paths)[choice] #print 'path:', path return path
#!/usr/bin/python ''' Fat tree topology for data center networking @author Milad Sharif (msha...@stanford.edu) ''' from mininet.topo import Topo class FatTreeNode(object): def __init__(self, pod = 0, sw = 0, host = 0, name = None, dpid = None): ''' Create FatTreeNode ''' if dpid: self.pod = ( dpid & 0xff0000 ) >> 16 self.sw = ( dpid & 0xff00 ) >> 8 self.host = ( dpid & 0xff ) self.dpid = dpid else: if name: pod, sw, host = [int(s) for s in name.split('h')] self.pod = pod self.sw = sw self.host = host self.dpid = (pod << 16) + (sw << 8) + host def name_str(self): ''' Return name ''' return "%ih%ih%i" % (self.pod, self.sw, self.host) def ip_str(self): ''' Return IP address ''' return "10.%i.%i.%i" % (self.pod, self.sw, self.host) def mac_str(self): ''' Return MAC address ''' return "00:00:00:%02x:%02x:%02x" % (self.pod, self.sw, self.host) class NonBlockingTopo(Topo): LAYER_CORE = 0 LAYER_HOST = 3 def __init__(self, k=4): ''' Create a non-bloking switch ''' super(NonBlockingTopo, self).__init__() self.k = k self.node_gen = FatTreeNode pods = range(0, k) edge_sw = range(0, k/2) agg_sw = range(k/2, k) hosts = range(2, k/2+2) core = self.node_gen(k, 1, 1) core_opts = self.def_opts(core.name_str()) self.addSwitch(core.name_str(), **core_opts) for p in pods: for e in edge_sw: for h in hosts: host = self.node_gen(p,e,h) host_opts = self.def_opts(host.name_str()) self.addHost(host.name_str(), **host_opts) self.addLink(host.name_str(), core.name_str()) def layer(self, name): ''' Return the layer of a node ''' node = self.node_gen(name = name) if (node.pod == self.k): layer = self.LAYER_CORE else: layer = self.LAYER_HOST return layer def def_opts(self, name): ''' return default dict for FatTree node ''' node = self.node_gen(name = name) d = {'layer': self.layer(name)} if d['layer'] == self.LAYER_HOST: d.update({'ip': node.ip_str()}) d.update({'mac': node.mac_str()}) d.update({'dpid': "%016x" % node.dpid}) return d class FatTreeTopo(Topo): LAYER_CORE = 0 LAYER_AGG = 1 LAYER_EDGE = 2 LAYER_HOST = 3 def __init__(self, k = 4): ''' Create FatTree topology k : Number of pods (can support upto k^3/4 hosts) ''' super(FatTreeTopo, self).__init__() self.k = k self.node_gen = FatTreeNode self.numPods = k self.aggPerPod = k / 2 pods = range(0, k) edge_sw = range(0, k/2) agg_sw = range(k/2, k) core_sw = range(1, k/2+1) hosts = range(2, k/2+2) for p in pods: for e in edge_sw: edge = self.node_gen(p, e, 1) edge_opts = self.def_opts(edge.name_str()) self.addSwitch(edge.name_str(), **edge_opts) for h in hosts: host = self.node_gen(p, e, h) host_opts = self.def_opts(host.name_str()) self.addHost(host.name_str(), **host_opts) self.addLink(edge.name_str(),host.name_str()) for a in agg_sw: agg = self.node_gen(p, a, 1) agg_opts = self.def_opts(agg.name_str()) self.addSwitch(agg.name_str(), **agg_opts) self.addLink(agg.name_str(),edge.name_str()) for a in agg_sw: agg = FatTreeNode(p, a, 1) for c in core_sw: core = self.node_gen(k, a-k/2+1, c) core_opts = self.def_opts(core.name_str()) self.addSwitch(core.name_str(), **core_opts) self.addLink(agg.name_str(),core.name_str()) def layer(self, name): ''' Return layer of node ''' node = self.node_gen(name = name) if (node.pod == self.k): layer = self.LAYER_CORE elif (node.host == 1): if (node.sw < self.k/2): layer = self.LAYER_EDGE else: layer = self.LAYER_AGG else: layer = self.LAYER_HOST return layer def isPortUp(self, port): if port > (self.k/2): return True else: return False def layer_nodes(self, layer): ''' Return nodes at the given layer ''' return [n for n in self.g.nodes() if self.layer(n) == layer] def upper_nodes(self, name): ''' Return nodes at one layer higher(closer to core) ''' layer = self.layer(name) - 1 return [n for n in self.g[name] if self.layer(n) == layer] def lower_nodes(self, name): '''Return edges one layer lower (closer to hosts) ''' layer = self.layer(name) + 1 return [n for n in self.g[name] if self.layer(n) == layer] def def_opts(self, name): ''' return default dict for FatTree node ''' node = self.node_gen(name = name) d = {'layer': self.layer(name)} if d['layer'] == self.LAYER_HOST: d.update({'ip': node.ip_str()}) d.update({'mac': node.mac_str()}) d.update({'dpid': "%016x" % node.dpid}) return d
'''the demand estimation algorithm from Hedera paper is implemented here''' import random #import pdb def demand_estimation(flows, hostsList): M ={} for i in hostsList: M[i] = {} for j in hostsList: M[i][j] = {'demand': 0, 'demandInit': 0, 'converged' : False, 'FlowNmbr' : 0} for flow in flows: M[flow['src']][flow['dst']]['FlowNmbr'] += 1 print 'Moujo: ', M demandChange = True while demandChange: demandChange = False for src in hostsList: Est_Src(M, flows, src) for dst in hostsList: Est_Dst(M, flows, dst) for i in hostsList: for j in hostsList: if M[i][j]['demandInit'] != M[i][j]['demand']: NoChange = True M[i][j]['demandInit'] = M[i][j]['demand'] print"********************estimated demands*********************\n", demandsPrinting(M,hostsList) return (M, flows) def Est_Src(M, flows, src): dF = 0 nU = 0 for flow in flows: if flow['src'] == src: if flow['converged']: dF += flow['demand'] else: nU += 1 if nU != 0: eS = (1.0 - dF) / nU for flow in flows: if flow['src'] == src and not flow['converged']: M[flow['src']][flow['dst']]['demand'] = eS #pdb.set_trace() flow['demand'] = eS def Est_Dst(M, flows, dst): dT = 0 dS = 0 nR = 0 for flow in flows: if flow['dst'] == dst: flow['recLimited'] = True dT += flow['demand'] nR += 1 if dT <= 1.0: return eS = 1.0 / nR flagFlip=True while flagFlip: flagFlip = False nR = 0 for flow in flows: if flow['dst'] == dst and flow['recLimited']: if flow['demand'] < eS: dS += flow['demand'] flow['recLimited'] = False flagFlip = True else: nR += 1 eS = (1.0-dS)/nR for flow in flows: if flow['dst'] == dst and flow['recLimited']: M[flow['src']][flow['dst']]['demand'] = eS M[flow['src']][flow['dst']]['converged'] = True flow['converged'] = True flow['demand'] = eS def demandsPrinting(M,hostsList): print hostsList, '\n', '_'*80 for row in hostsList: #pdb.set_trace() print row,'|', for col in hostsList: temp = M[row][col] print '%.2f' % temp['demand'], print def makeFlows(flows, src, dsts): demand = 0.2 / len(dsts) for dst in dsts: flows.append({'converged': False, 'demand': demand, 'src': src, 'dst': dst, 'recLimited': False}) if __name__ == '__main__': hostsList = range(15) flows = [] for i in range(15): dst = random.randint(0,14) if dst > 6: makeFlows(flows, i, [dst, dst]) else: makeFlows(flows, i, [dst, dst+1]) #pdb.set_trace() M, flows_estimated = demand_estimation(flows, hostsList) demandsPrinting(M,hostsList) #pdb.set_trace()
''' Hedera data center controller @author: Behnam Montazeri (behn...@stanford.edu) ''' import logging import sys sys.path.append('/home/juno/msharif-h/') from struct import pack from zlib import crc32 from pox.core import core import pox.openflow.libopenflow_01 as of from pox.lib.revent import EventMixin from pox.lib.util import dpidToStr from pox.lib.packet.ipv4 import ipv4 from pox.lib.packet.udp import udp from pox.lib.packet.tcp import tcp from util import buildTopo, getRouting from DemandEstimation import demand_estimation from threading import Timer, Lock log = core.getLogger() # Number of bytes to send for packet_ins MISS_SEND_LEN = 2000 class Switch(EventMixin): def __init__(self): self.connection = None self.dpid = None self.ports = None def connect(self, connection): if self.dpid is None: self.dpid = connection.dpid assert self.dpid == connection.dpid self.connection = connection def send_packet_data(self, outport, data = None): msg = of.ofp_packet_out(in_port=of.OFPP_NONE, data = data) msg.actions.append(of.ofp_action_output(port = outport)) self.connection.send(msg) def send_packet_bufid(self, outport, buffer_id = -1): msg = of.ofp_packet_out(in_port=of.OFPP_NONE) msg.actions.append(of.ofp_action_output(port = outport)) msg.buffer_id = buffer_id self.connection.send(msg) def install(self, port, match, buf = -1, deleteFlow=False, idle_timeout = 0 ): msg = of.ofp_flow_mod() msg.match = match msg.idle_timeout = idle_timeout msg.actions.append(of.ofp_action_output(port = port)) if deleteFlow: msg.command = of.OFPFC_DELETE #msg.buffer_id = buf msg.flags = of.OFPFF_SEND_FLOW_REM self.connection.send(msg) class HController(EventMixin): def __init__(self, t, r, bw): self.switches = {} # [dpid]->switch self.macTable = {} # [mac]->(dpid, port) self.t = t # Topo object self.r = r # Routng object self.all_switches_up = False core.openflow.addListeners(self) self.statCntr = 0 #sanity check for the flow stats self.HostNameList = [] #a dictionary of the host self.hostsList = [] #list of host dpid self.flows = [] #list of the collected stats self.bw = bw self.beReservation = {} #reservation for the elephant flows self.statMonitorLock = Lock() #to lock the multi access threads self.statMonitorLock.acquire() statMonitorTimer = Timer(10.0,self._collectFlowStats()) #timer to collect stats statMonitorTimer.start() self.matchDict = {} # dictioanary of the matches def _ecmp_hash(self, packet): ''' Return an ECMP-style 5-tuple hash for TCP/IP packets, otherwise 0. RFC2992 ''' hash_input = [0] * 5 if isinstance(packet.next, ipv4): ip = packet.next hash_input[0] = ip.srcip.toUnsigned() hash_input[1] = ip.dstip.toUnsigned() hash_input[2] = ip.protocol if isinstance(ip.next, tcp) or isinstance(ip.next, udp): l4 = ip.next hash_input[3] = l4.srcport hash_input[4] = l4.dstport return crc32(pack('LLHHH', *hash_input)) return 0 def _flood(self, event): ''' Broadcast to every output port ''' packet = event.parsed dpid = event.dpid in_port = event.port t = self.t # Broadcast to every output port except the input on the input switch. for sw_name in t.layer_nodes(t.LAYER_EDGE): for host_name in t.lower_nodes(sw_name): sw_port, host_port = t.port(sw_name, host_name) sw = t.node_gen(name = sw_name).dpid # Send packet out each non-input host port if sw != dpid or (sw == dpid and in_port != sw_port): self.switches[sw].send_packet_data(sw_port, event.data) def _install_reactive_path(self, event, out_dpid, final_out_port, packet): ''' Install entries on route between two switches. ''' in_name = self.t.node_gen(dpid = event.dpid).name_str() out_name = self.t.node_gen(dpid = out_dpid).name_str() hash_ = self._ecmp_hash(packet) route = self.r.get_route(in_name, out_name, hash_) print "Route:",route print '-'*80 if route == None: print None, "route between", in_name, "and", out_name return match = of.ofp_match.from_packet(packet) for i, node in enumerate(route): node_dpid = self.t.node_gen(name = node).dpid if i < len(route) - 1: next_node = route[i + 1] out_port, next_in_port = self.t.port(node, next_node) else: out_port = final_out_port self.switches[node_dpid].install(out_port, match, idle_timeout = 10) if isinstance(packet.next, of.ipv4) and isinstance(packet.next.next, of.tcp): self.matchDict[(packet.next.srcip, packet.next.dstip, packet.next.next.srcport, packet.next.next.dstport)] = (route, match) def _handle_PacketIn(self, event): if not self.all_switches_up: #log.info("Saw PacketIn before all switches were up - ignoring." ) return packet = event.parsed dpid = event.dpid in_port = event.port # Learn MAC address of the sender on every packet-in. self.macTable[packet.src] = (dpid, in_port) sw_name = self.t.node_gen(dpid = dpid).name_str() #print "Sw:", sw_name, packet.src, packet.dst,"port", in_port, packet.dst.isMulticast(),"macTable", packet.dst in self.macTable #print '-'*80 # Insert flow, deliver packet directly to destination. if packet.dst in self.macTable: out_dpid, out_port = self.macTable[packet.dst] self._install_reactive_path(event, out_dpid, out_port, packet) self.switches[out_dpid].send_packet_data(out_port, event.data) else: self._flood(event) def _handle_ConnectionUp(self, event): sw = self.switches.get(event.dpid) sw_str = dpidToStr(event.dpid) sw_name = self.t.node_gen(dpid = event.dpid).name_str() if sw_name not in self.t.switches(): log.warn("Ignoring unknown switch %s" % sw_str) return #log.info("A new switch came up: %s", sw_str) if sw is None: log.info("Added a new switch %s" % sw_name) sw = Switch() self.switches[event.dpid] = sw sw.connect(event.connection) sw.connection.send(of.ofp_set_config(miss_send_len=MISS_SEND_LEN)) if len(self.switches)==len(self.t.switches()): log.info("All of the switches are up") self.all_switches_up = True if self.statMonitorLock.locked(): self.statMonitorLock.release() def _collectFlowStats(self): log.info("attempt to capture STATS") ''' this function send the flow stat requests''' if not self.statMonitorLock.locked(): # log.info("here it goes to monitor flow stats") self.statMonitorLock.acquire() self.statCntr = 0 self.flows = [] self.HostNameList = [] self.hostsList = [] for sw_name in self.t.layer_nodes(self.t.LAYER_EDGE): sw_dpid = self.t.node_gen(name = sw_name).dpid #print 'sw_dpid',sw_dpid ,'sw_name',sw_name for port in range(1,self.t.k + 1): if not self.t.isPortUp(port): msg = of.ofp_stats_request() msg.type = of.OFPST_FLOW msg.body = of.ofp_flow_stats_request() msg.body.out_port = port self.switches[sw_dpid].connection.send(msg) self.statCntr += 1 self.statMonitorLock.release() statMonitorTimer = Timer(3.5, self._collectFlowStats) statMonitorTimer.start() def IP2name_dpid(self,IP): IP = str(IP) ten, p, e, h = (int(s) for s in IP.split('.')) node_name = self.t.node_gen(p,e,h).name_str() dpid_ = (p << 16) + (e << 8) + h return (node_name, dpid_) def _handle_FlowStatsReceived(self, event): '''handle function for collected stats ''' # log.info( "flow stat collected, process begins") #print 'event.stats', event.stats self.statCntr -= 1 for stat in event.stats: flowLivingTime = stat.duration_sec * 1e9 + stat.duration_nsec if flowLivingTime <= 1: flowLivingTime = 1 flowDemand = 8 * float(stat.byte_count) / flowLivingTime / self.bw #print 'stat.match.in_port:', stat.match.in_port,'flow byte_count',stat.byte_count,'flowLivingTime:', flowLivingTime, 'flowDemand:', flowDemand, 'stat.match.scrIP:', stat.match.nw_src, 'stat.match.dstIP', stat.match.nw_dst src_name, src = self.IP2name_dpid(stat.match.nw_src) dst_name, dst = self.IP2name_dpid(stat.match.nw_dst) #print 'src_name:',src_name,'dst_name:', dst_name,'src_dpid:', src,'dst_dpid:', dst #print stat.match.nw_src, stat.match.nw_dst, stat.match.tp_src, stat.match.tp_dst if flowDemand > 0.1: if src not in self.hostsList: self.hostsList.append(src) self.HostNameList.append({'node_name':src_name, 'dpid':src}) if dst not in self.hostsList: self.hostsList.append(dst) self.HostNameList.append({'node_name':dst_name, 'dpid':dst}) self.flows.append({ 'demand': flowDemand, 'converged':False, 'src': src, 'dst': dst, 'recLimited': False, 'match': stat.match}) if self.statCntr == 0: print "****flows processed, Estimating demands begins" self._demandEstimator() def _demandEstimator(self): '''estimate the actual flow demands here''' temp = self.flows temp = sorted(temp, key=lambda temp:temp['src']) self.flows = temp self.bwReservation = {} M, estFlows = demand_estimation(self.flows, sorted(self.hostsList)) for flow in estFlows: demand = flow['demand'] if demand >= 0.1: self._GlobalFirstFit(flow) def _GlobalFirstFit(self,flow): '''do the Hedera global first fit here''' src_name = self.t.node_gen(dpid = flow['src']).name_str() dst_name = self.t.node_gen(dpid = flow['dst']).name_str() print 'Global Fisrt Fit for the elephant flow from ',src_name,'to', dst_name paths = self.r.routes(src_name,dst_name) print 'all routes found for the big flow:\n',paths GFF_route = None if paths == None: return else: for path in paths: fitCheck = True for i in range(1,len(path)): fitCheck = False if self.bwReservation.has_key(path[i-1]) and self.bwReservation[path[i-1]].has_key(path[i]): if self.bwReservation[path[i-1]][path[i]]['reserveDemand'] + flow['demand'] > 1 : break else: #self.bwReservation[path[i-1]][path[i]]['reserveDemand'] += flow['demand'] fitCheck = True else: self.bwReservation[path[i-1]]={} self.bwReservation[path[i-1]][path[i]]={'reserveDemand':0} fitCheck = True if fitCheck == True: for i in range(1,len(path)): self.bwReservation[path[i-1]][path[i]]['reserveDemand'] += flow['demand'] GFF_route = path print "GFF route found:", path break if GFF_route != None: """install new GFF_path between source and destintaion""" self. _install_GFF_path(GFF_route,flow['match']) def _install_GFF_path(self,GFF_route, match): '''installing the global first fit path here''' flow_match = match _route, match = self.matchDict[match.nw_src, match.nw_dst, match.tp_src, match.tp_dst] if _route != GFF_route[1:-1] and not self.statMonitorLock.locked(): print "old route", _route print "match info:", match.nw_src, match.nw_dst, match.tp_src, match.tp_dst self.statMonitorLock.acquire() ''' Install entries on route between two switches. ''' route = GFF_route[1:-1] print"GFF route to be installed between switches:", route for i, node in enumerate(route): node_dpid = self.t.node_gen(name = node).dpid if i < len(route) - 1: next_node = route[i + 1] out_port, next_in_port = self.t.port(node, next_node) else: dpid_out, out_port = self.macTable[match.dl_dst] #print 'out_dpid', dpid_out,self.t.node_gen(name = GFF_route[-1]).dpid #print 'outPort', out_port self.switches[node_dpid].install(out_port, match,idle_timeout = 10) self.statMonitorLock.release() self.matchDict[flow_match.nw_src, flow_match.nw_dst, flow_match.tp_src, flow_match.tp_dst] = (route, match) print '_'*20 def launch(topo = None, routing = None, bw = None ): #print topo if not topo: raise Exception ("Please specify the topology") else: t = buildTopo(topo) r = getRouting(routing, t) if bw == None: bw = 10.0 #Mb/s bw = float(bw/1000) #Gb/s else: bw = float(bw)/1000 core.registerNew(HController, t, r, bw) log.info("** HController is running")
# utility functions from DCTopo import FatTreeTopo from mininet.util import makeNumeric from DCRouting import HashedRouting, Routing TOPOS = {'ft': FatTreeTopo} ROUTING = {'ECMP' : HashedRouting} def buildTopo(topo): topo_name, topo_param = topo.split( ',' ) return TOPOS[topo_name](makeNumeric(topo_param)) def getRouting(routing, topo): return ROUTING[routing](topo)
-- https://mail.python.org/mailman/listinfo/python-list