General comment. Some of logic could be generic, not specific to
datapath tracking.


On Tue, Feb 19, 2013 at 08:41:01PM +0900, YAMADA Hideki wrote:
> This is alternative of dpset.
> * Don't call other app's method directly.
> * Use Event request/reply
> 
> event.py: event class modules
> datapath.py: datapath discovery app using ofp_event
> dumper.py: test app using discovery event
> 
> TODO: support link discovery using LLDP
> 
> Signed-off-by: YAMADA Hideki <[email protected]>
> ---
>  ryu/physicaltopology/datapath.py |  142 
> ++++++++++++++++++++++++++++++++++++++
>  ryu/physicaltopology/dumper.py   |   79 +++++++++++++++++++++
>  ryu/physicaltopology/event.py    |   97 ++++++++++++++++++++++++++
>  3 files changed, 318 insertions(+), 0 deletions(-)
>  create mode 100644 ryu/physicaltopology/__init__.py
>  create mode 100644 ryu/physicaltopology/datapath.py
>  create mode 100644 ryu/physicaltopology/dumper.py
>  create mode 100644 ryu/physicaltopology/event.py
> 
> diff --git a/ryu/physicaltopology/__init__.py 
> b/ryu/physicaltopology/__init__.py
> new file mode 100644
> index 0000000..e69de29
> diff --git a/ryu/physicaltopology/datapath.py 
> b/ryu/physicaltopology/datapath.py
> new file mode 100644
> index 0000000..6cb3767
> --- /dev/null
> +++ b/ryu/physicaltopology/datapath.py
> @@ -0,0 +1,142 @@
> +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at
> +#
> +#    http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> +# implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +import logging
> +from ryu.base import app_manager
> +from ryu.controller import ofp_event, handler
> +
> +from .event import EventDPEnter, EventDPLeave
> +from .event import EventDPPortAdd, EventDPPortDelete, EventDPPortModify
> +from .event import EventDPListRequest, EventDPListReply
> +LOG = logging.getLogger(__name__)
> +
> +
> +class DatapathDiscovery(app_manager.RyuApp):
> +    _EVENTS = [EventDPEnter, EventDPLeave,
> +               EventDPPortAdd, EventDPPortDelete, EventDPPortModify]
> +
> +    def __init__(self):
> +        super(DatapathDiscovery, self).__init__()
> +
> +        self.dpids = []   # datapath_id
> +        self.ports = {}  # datapath_id => ports
> +        # OFPP_MAX might be different when using multiple version of 
> OpenFlow?
> +        self._OFPP_MAX = {}  # datapath_id => OFPP_MAX

Why not whole ofproto? How about
self.dpids = dict: dpid -> the pair (ofproto, ports)?
Thus the logic of _unregister() will be simplified.
Possibly we'd like to introduce another class to describe the pair.

        
> +
> +    def _register(self, dp):
> +        assert dp.id not in self.dpids
> +        dpid = dp.id
> +
> +        self.dpids.append(dpid)
> +
> +        self._OFPP_MAX[dpid] = dp.ofproto.OFPP_MAX
> +
> +        # LOG.debug('dp %s has ports %s', dpid, dp.ports)
> +        for port_no in dp.ports.iterkeys():
> +            self._add_port(dpid, port_no)
> +
> +    def _unregister(self, dp):
> +        assert dp.id in self.dpids
> +        dpid = dp.id
> +
> +        self.dpids.remove(dpid)
> +
> +        assert dpid in self.ports
> +        del self.ports[dpid]
> +
> +        assert dpid in self._OFPP_MAX
> +        del self._OFPP_MAX
> +
> +    def _add_port(self, dpid, port_no):
> +        if port_no > self._OFPP_MAX[dpid]:
> +            # LOG.debug('port %s is reserved', port_no)
> +            return
> +
> +        self.ports.setdefault(dpid, [])
> +        self.ports[dpid].append(port_no)

           self.ports.setdefault(dpid, []).append(port_no)
dict.setdefault() returns value.


> +
> +    def _del_port(self, dpid, port_no):
> +        assert dpid in self.ports
> +
> +        self.ports[dpid].remove(port_no)
> +
> +    def _get_ports(self, dpid):
> +        return self.ports[dpid]
> +
> +    @handler.set_ev_cls(ofp_event.EventOFPStateChange,
> +                        [handler.MAIN_DISPATCHER, handler.DEAD_DISPATCHER])
> +    def state_change_handler(self, ev):
> +        dp = ev.datapath
> +        dpid = dp.id
> +        assert dp is not None
> +
> +        if ev.state == handler.MAIN_DISPATCHER:
> +            self._register(dp)
> +            LOG.debug('register datapath %s, ports %s',
> +                      dpid, self._get_ports(dpid))
> +            self.send_event_to_observers(
> +                EventDPEnter(dpid, self._get_ports(dpid)))
> +
> +        elif ev.state == handler.DEAD_DISPATCHER:
> +            LOG.debug('unregister datapath %s', dp.id)
> +            self._unregister(dp)
> +            self.send_event_to_observers(
> +                EventDPLeave(dpid, self._get_ports(dpid)))
> +
> +    @handler.set_ev_cls(ofp_event.EventOFPPortStatus, 
> handler.MAIN_DISPATCHER)
> +    def port_status_handler(self, ev):
> +        msg = ev.msg
> +        reason = msg.reason
> +        dpid = msg.datapath.id
> +        port_no = msg.desc.port_no
> +        ofproto = msg.datapath.ofproto
> +
> +        if reason == ofproto.OFPPR_ADD:
> +            LOG.debug('port was added.' +
> +                      '(datapath id = %s, port number = %s)',
> +                      dpid, port_no)
> +            self._add_port(dpid, port_no)
> +            self.send_event_to_observers(EventDPPortAdd(dpid, port_no))
> +
> +        elif reason == ofproto.OFPPR_DELETE:
> +            LOG.debug('port was deleted.' +
> +                      '(datapath id = %s, port number = %s)',
> +                      dpid, port_no)
> +            self._del_port(dpid, port_no)
> +            self.send_event_to_observers(EventDPPortDelete(dpid, port_no))
> +
> +        else:
> +            assert reason == ofproto.OFPPR_MODIFY
> +            LOG.debug('port was modified.' +
> +                      '(datapath id = %s, port number = %s)',
> +                      dpid, port_no)
> +            self.send_event_to_observers(EventDPPortModify(dpid, port_no))

Doesn't this track port state like link up/down.


> +
> +    @handler.set_ev_cls(EventDPListRequest)
> +    def request_handler(self, ev):
> +        dpid = ev.dpid
> +        # LOG.debug(ev)
> +        if dpid is None:
> +            dpids = self.dpids
> +            ports = self.ports
> +        elif dpid in self.dpids:
> +            dpids = [dpid]
> +            ports = [self.ports[dpid]]
> +        else:
> +            dpids = []
> +            ports = {}
> +
> +        reply = EventDPListReply(self.name, ev, dpids, ports)
> +        self.send_event(ev.src, reply)

This logic can be generalized and can be a method in AppManager or RyuApp.
This needs to be blocked and wait request.
gevent.event (or equivalent primitive) can be used to blocking.


> diff --git a/ryu/physicaltopology/dumper.py b/ryu/physicaltopology/dumper.py
> new file mode 100644
> index 0000000..f5dcd5f
> --- /dev/null
> +++ b/ryu/physicaltopology/dumper.py
> @@ -0,0 +1,79 @@
> +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at
> +#
> +#    http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> +# implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +
> +import logging
> +import gevent
> +import gevent.queue
> +
> +from ryu.base import app_manager
> +from ryu.controller import handler
> +
> +from .event import EventDPEnter, EventDPLeave
> +from .event import EventDPPortAdd, EventDPPortDelete, EventDPPortModify
> +from .event import EventDPListRequest, EventDPListReply
> +import datapath
> +
> +LOG = logging.getLogger(__name__)
> +
> +
> +class DiscoveryEventDumper(app_manager.RyuApp):
> +    _REQUEST_TIMEOUT = 30
> +
> +    def __init__(self):
> +        super(DiscoveryEventDumper, self).__init__()
> +        self.replys = gevent.queue.Queue()
> +        self.threads.append(gevent.spawn_later(0, self._request_loop))
> +        self.is_active = True
> +
> +    @handler.set_ev_cls(EventDPEnter)
> +    def dp_enter_handler(self, ev):
> +        LOG.debug('datapath(%s) entered', ev)
> +
> +    @handler.set_ev_cls(EventDPLeave)
> +    def dp_leave_handler(self, ev):
> +        LOG.debug('datapath(%s) leaved', ev)
> +
> +    @handler.set_ev_cls(EventDPPortAdd)
> +    def dp_port_add_handler(self, ev):
> +        LOG.debug('port(%s) added', ev)
> +
> +    @handler.set_ev_cls(EventDPPortDelete)
> +    def dp_port_delete_handler(self, ev):
> +        LOG.debug('port(%s) deleted', ev)
> +
> +    @handler.set_ev_cls(EventDPPortModify)
> +    def dp_port_modify_handler(self, ev):
> +        LOG.debug('port(%s) modified', ev)
> +
> +    @handler.set_ev_cls(EventDPListReply)
> +    def reply_handler(self, ev):
> +        self.replys.put(ev)
> +
> +    def _get_dp_list(self, dpid=None):
> +        request = EventDPListRequest(self.name, dpid)
> +        self.send_event(datapath.DatapathDiscovery.__name__, request)
> +        # assume that replys are in right order
> +        reply = self.replys.get(timeout=self._REQUEST_TIMEOUT)
> +        assert reply.request == request
> +        return reply
> +
> +    def _request_loop(self):
> +        while self.is_active:
> +            dp_list_all = self._get_dp_list()
> +            LOG.debug('ALL DPList: %s', dp_list_all)
> +            dp1 = self._get_dp_list(1)
> +            LOG.debug('DP1: %s', dp1)
> +            gevent.sleep(10)

I thinks this is just for debug/showing how to use, so it's just ok for now.
But we need more generic way to snoop event queuing.
Probably registering observer with the list of (brick name, event class).


> diff --git a/ryu/physicaltopology/event.py b/ryu/physicaltopology/event.py
> new file mode 100644
> index 0000000..4b85cff
> --- /dev/null
> +++ b/ryu/physicaltopology/event.py
> @@ -0,0 +1,97 @@
> +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at
> +#
> +#    http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> +# implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +import logging
> +from ryu.controller import event
> +
> +
> +LOG = logging.getLogger(__name__)
> +
> +
> +class EventDPBase(event.EventBase):
> +    def __init__(self, dpid, ports):
> +        super(EventDPBase, self).__init__()
> +        self.dpid = dpid
> +        self.ports = ports  # port list when enter or leave
> +
> +    def __str__(self):
> +        return 'dpid = %s, ports = %s' % (self.dpid, self.ports)
> +
> +
> +class EventDPEnter(EventDPBase):
> +    def __init__(self, dpid, ports):
> +        super(EventDPEnter, self).__init__(dpid, ports)
> +
> +
> +class EventDPLeave(EventDPBase):
> +    def __init__(self, dpid, ports):
> +        super(EventDPLeave, self).__init__(dpid, ports)
> +
> +
> +class EventDPPortBase(event.EventBase):
> +    def __init__(self, dpid, port_no):
> +        super(EventDPPortBase, self).__init__()
> +        self.dpid = dpid
> +        self.port_no = port_no
> +
> +    def __str__(self):
> +        return 'dpid = %s, port_no =%s' % (self.dpid, self.port_no)
> +
> +
> +class EventDPPortAdd(EventDPPortBase):
> +    def __init__(self, dpid, port_no):
> +        super(EventDPPortAdd, self).__init__(dpid, port_no)
> +
> +
> +class EventDPPortDelete(EventDPPortBase):
> +    def __init__(self, dpid, port_no):
> +        super(EventDPPortDelete, self).__init__(dpid, port_no)
> +
> +
> +class EventDPPortModify(EventDPPortBase):
> +    def __init__(self, dpid, port_no):
> +        super(EventDPPortModify, self).__init__(dpid, port_no)
> +
> +
> +class EventRequestBase(event.EventBase):
> +    def __init__(self, src):
> +        self.src = src  # brick name of event source

EventRequestBase and EventReplyBase is generic enough to be in event.py

> +
> +
> +class EventDPListRequest(EventRequestBase):
> +    def __init__(self, src, dpid=None):
> +        super(EventDPListRequest, self).__init__(src)
> +        self.dpid = dpid
> +
> +    def __str__(self):
> +        return 'EventDPListRequest<src=%s, dpid=%s>' % (self.src, self.dpid)
> +
> +
> +class EventReplyBase(event.EventBase):
> +    def __init__(self, src, request):
> +        self.src = src  # brick name of event source
> +        assert issubclass(request.__class__, EventRequestBase)
                  isinstance(request, EventRequestBase)

> +        self.request = request

request is an object which is not suitable to be encoded in event.
How about Event{Request, Reply}Base.xid and use id(request)?
And more we'd like to a framework to snoop into request/reply.

> +
> +
> +class EventDPListReply(EventReplyBase):
> +    def __init__(self, src, request, dpids, ports):
> +        super(EventDPListReply, self).__init__(src, request)
> +        self.dpids = dpids
> +        self.ports = ports
> +
> +    def __str__(self):
> +        return 'EventDPListReply<dpids=%s, ports=%s>' % \
> +            (self.dpids, self.ports)
> -- 
> 1.7.1
> 
> 
> 
> ------------------------------------------------------------------------------
> Everyone hates slow websites. So do we.
> Make your web apps faster with AppDynamics
> Download AppDynamics Lite for free today:
> http://p.sf.net/sfu/appdyn_d2d_feb
> _______________________________________________
> Ryu-devel mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/ryu-devel
> 

-- 
yamahata

------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_d2d_feb
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to