On Tue, 29 Jan 2013 12:18:58 +0900
Isaku Yamahata <[email protected]> wrote:

> On Mon, Jan 21, 2013 at 12:31:28PM +0900, FUJITA Tomonori wrote:
>> This is purely internal change and no API for applications is
>> changed. At least, I confirmed that folsom OpenStack plugin works.
>> 
>> With the current dispatcher mechanism, multiple greenlets call
>> applications' handlers and might be blocked anywhere so we need
>> various locks to handle that concurrency. This makes things difficult
>> for application developers.
>> 
>> With this patch, each software components are connected with events.
>> Each component has the own greenlet(s) to handle events and might send
>> events to other components.
>> 
>> For the compatibility, currently RyuApp class instances are converted
>> into the above components. If an application registers handlers for
>> some OF events, it subscribes to OF component and registers the OF
>> events that it's interested. OF component delivers such OF events to
>> the application and the application's greenlet executes the handlers.
>> 
>> With this, we can completely remove dispatcher.py and its friends.
>> 
>> Signed-off-by: FUJITA Tomonori <[email protected]>
>> ---
>>  ryu/base/app_manager.py       |   75 
>> +++++++++++++++++++++++++++++++++-------
>>  ryu/controller/controller.py  |   36 ++++++++++----------
>>  ryu/controller/dpset.py       |   31 ++++++-----------
>>  ryu/controller/handler.py     |   43 +++++++++++------------
>>  ryu/controller/ofp_event.py   |    6 +++
>>  ryu/controller/ofp_handler.py |   21 ++++++-----
>>  6 files changed, 129 insertions(+), 83 deletions(-)
>> 
>> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
>> index 4a29c95..eedae11 100644
>> --- a/ryu/base/app_manager.py
>> +++ b/ryu/base/app_manager.py
>> @@ -17,6 +17,9 @@
>>  import inspect
>>  import itertools
>>  import logging
>> +import gevent
>> +
>> +from gevent.queue import Queue
>>  
>>  from ryu import utils
>>  from ryu.controller.handler import register_instance
>> @@ -24,20 +27,13 @@ from ryu.controller.controller import Datapath
>>  
>>  LOG = logging.getLogger('ryu.base.app_manager')
>>  
>> +SERVICE_BRICKS = {}
>>  
>> -class RyuAppContext(object):
>> -    """
>> -    Base class for Ryu application context
>> -    """
>> -    def __init__(self):
>> -        super(RyuAppContext, self).__init__()
>>  
>> -    def close(self):
>> -        """
>> -        teardown method
>> -        The method name, close, is chosen for python context manager
>> -        """
>> -        pass
>> +def lookup_service_brick(name):
>> +    if name in SERVICE_BRICKS:
>> +        return SERVICE_BRICKS[name]
>> +    return None
> 
>        return SERVICE_BRIDGE.get(name)

Fixed.

>>  
>>  class RyuApp(object):
>> @@ -55,6 +51,45 @@ class RyuApp(object):
>>  
>>      def __init__(self, *_args, **_kwargs):
>>          super(RyuApp, self).__init__()
>> +        self.name = self.__class__.__name__
>> +        self.event_handlers = {}
>> +        self.observers = {}
>> +        self.threads = []
>> +        self.events = Queue()
>> +        self.threads.append(gevent.spawn(self._event_loop))
>> +
>> +    def register_handler(self, ev_cls, handler):
>> +        assert callable(handler)
>> +        self.event_handlers.setdefault(ev_cls, [])
>> +        self.event_handlers[ev_cls].append(handler)
>> +
>> +    def register_observer(self, ev_cls, name):
>> +        self.observers.setdefault(ev_cls, [])
>> +        self.observers[ev_cls].append(name)
>> +
>> +    def get_handlers(self, ev):
>> +        return self.event_handlers.get(ev.__class__, [])
>> +
>> +    def get_observers(self, ev):
>> +        return self.observers.get(ev.__class__, [])
>> +
>> +    def _event_loop(self):
>> +        while True:
>> +            ev = self.events.get()
>> +            handlers = self.get_handlers(ev)
>> +            for handler in handlers:
>> +                handler(ev)
>> +
>> +    def _send_event(self, ev):
>> +        self.events.put(ev)
>> +
>> +    def send_event(self, name, ev):
>> +        if name in SERVICE_BRICKS:
>> +            SERVICE_BRICKS[name]._send_event(ev)
>> +
>> +    def send_event_to_observers(self, ev):
>> +        for observer in self.get_observers(ev):
>> +            self.send_event(observer, ev)
>>  
>>      def close(self):
>>          """
>> @@ -102,7 +137,12 @@ class AppManager(object):
>>  
>>      def create_contexts(self):
>>          for key, cls in self.contexts_cls.items():
>> -            self.contexts[key] = cls()
>> +            context = cls()
>> +            self.contexts[key] = context
>> +            # hack for dpset
>> +            if context.__class__.__base__ == RyuApp:
>> +                SERVICE_BRICKS[context.name] = context
>> +                register_instance(context)
>>          return self.contexts
>>  
>>      def instantiate_apps(self, *args, **kwargs):
>> @@ -124,6 +164,15 @@ class AppManager(object):
>>              app = cls(*args, **kwargs)
>>              register_instance(app)
>>              self.applications[app_name] = app
>> +            SERVICE_BRICKS[app.name] = app
>> +
>> +        for key, i in SERVICE_BRICKS.items():
>> +            for _k, m in inspect.getmembers(i, inspect.ismethod):
>> +                if hasattr(m, 'observer'):
>> +                    name = m.observer.split('.')[-1]
>> +                    if name in SERVICE_BRICKS:
>> +                        brick = SERVICE_BRICKS[name]
>> +                        brick.register_observer(m.ev_cls, i.name)
>>  
>>      def close(self):
>>          def close_all(close_dict):
>> diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
>> index 1811d8c..7c33e4a 100644
>> --- a/ryu/controller/controller.py
>> +++ b/ryu/controller/controller.py
>> @@ -25,6 +25,8 @@ import ssl
>>  from gevent.server import StreamServer
>>  from gevent.queue import Queue
>>  
>> +import ryu.base.app_manager
>> +
>>  from ryu.ofproto import ofproto_common
>>  from ryu.ofproto import ofproto_parser
>>  from ryu.ofproto import ofproto_v1_0
>> @@ -35,7 +37,6 @@ from ryu.ofproto import ofproto_v1_3
>>  from ryu.ofproto import ofproto_v1_3_parser
>>  from ryu.ofproto import nx_match
>>  
>> -from ryu.controller import dispatcher
>>  from ryu.controller import handler
>>  from ryu.controller import ofp_event
>>  
>> @@ -123,25 +124,22 @@ class Datapath(object):
>>          # prevent it from eating memory up
>>          self.send_q = Queue(16)
>>  
>> -        # circular reference self.ev_q.aux == self
>> -        self.ev_q = dispatcher.EventQueue(handler.QUEUE_NAME_OFP_MSG,
>> -                                          handler.HANDSHAKE_DISPATCHER,
>> -                                          self)
>> -
>>          self.set_version(max(self.supported_ofp_version))
>>          self.xid = random.randint(0, self.ofproto.MAX_XID)
>>          self.id = None  # datapath_id is unknown yet
>>          self.ports = None
>>          self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
>> +        self.ofp_brick = 
>> ryu.base.app_manager.lookup_service_brick('ofp_event')
>> +        self.set_state(handler.HANDSHAKE_DISPATCHER)
>>  
>>      def close(self):
>> -        """
>> -        Call this before discarding this datapath object
>> -        The circular refernce as self.ev_q.aux == self must be broken.
>> -        """
>> -        # tell this datapath is dead
>> -        self.ev_q.set_dispatcher(handler.DEAD_DISPATCHER)
>> -        self.ev_q.close()
>> +        self.set_state(handler.DEAD_DISPATCHER)
>> +
>> +    def set_state(self, state):
>> +        self.state = state
>> +        ev = ofp_event.EventOFPStateChange(self)
>> +        ev.state = state
>> +        self.ofp_brick.send_event_to_observers(ev)
>>  
>>      def set_version(self, version):
>>          assert version in self.supported_ofp_version
>> @@ -169,7 +167,13 @@ class Datapath(object):
>>                  msg = ofproto_parser.msg(self,
>>                                           version, msg_type, msg_len, xid, 
>> buf)
>>                  #LOG.debug('queue msg %s cls %s', msg, msg.__class__)
>> -                self.ev_q.queue(ofp_event.ofp_msg_to_ev(msg))
>> +                ev = ofp_event.ofp_msg_to_ev(msg)
>> +                handlers = self.ofp_brick.get_handlers(ev)
>> +                for handler in handlers:
>> +                    if self.state in handler.dispatchers:
>> +                        handler(ev)
> 
> This is called every time event is sent. So this cost can be
> paid at handler registration. event_handlers can be
> self.event_handlers[ev.__class__][self.state] = list of handlers

Right, but I prefered to let it alone for now.

The state agrument is aux. Some uses it but some don't. An application
needs to overwrite some functions in RyuApp to do the above.


>> +
>> +                self.ofp_brick.send_event_to_observers(ev)
>>  
>>                  buf = buf[required_len:]
>>                  required_len = ofproto_common.OFP_HEADER_SIZE
>> @@ -219,10 +223,6 @@ class Datapath(object):
>>              gevent.kill(send_thr)
>>              gevent.joinall([send_thr])
>>  
>> -    def send_ev(self, ev):
>> -        #LOG.debug('send_ev %s', ev)
>> -        self.ev_q.queue(ev)
>> -
>>      #
>>      # Utility methods for convenience
>>      #
>> diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py
>> index 80ffb6f..dccca27 100644
>> --- a/ryu/controller/dpset.py
>> +++ b/ryu/controller/dpset.py
>> @@ -16,8 +16,9 @@
>>  
>>  import logging
>>  
>> +from ryu.base import app_manager
>>  from ryu.controller import event
>> -from ryu.controller import dispatcher
>> +from ryu.controller import ofp_event
>>  from ryu.controller import dp_type
>>  from ryu.controller import handler
>>  from ryu.controller.handler import set_ev_cls
>> @@ -25,9 +26,7 @@ from ryu.controller.handler import set_ev_cls
>>  LOG = logging.getLogger('ryu.controller.dpset')
>>  
>>  
>> -QUEUE_NAME_DPSET = 'dpset'
>> -DISPATCHER_NAME_DPSET = 'dpset'
>> -DPSET_EV_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_DPSET)
>> +DPSET_EV_DISPATCHER = 'dpset'
>>  
>>  
>>  class EventDPBase(event.EventBase):
>> @@ -46,18 +45,16 @@ class EventDP(EventDPBase):
>>  
>>  
>>  # this depends on controller::Datapath and dispatchers in handler
>> -class DPSet(object):
>> +class DPSet(app_manager.RyuApp):
>>      def __init__(self):
>>          super(DPSet, self).__init__()
>> +        self.name = 'dpset'
>>  
>>          # dp registration and type setting can be occur in any order
>>          # Sometimes the sw_type is set before dp connection
>>          self.dp_types = {}
>>  
>>          self.dps = {}   # datapath_id => class Datapath
>> -        self.ev_q = dispatcher.EventQueue(QUEUE_NAME_DPSET,
>> -                                          DPSET_EV_DISPATCHER)
>> -        handler.register_instance(self)
>>  
>>      def register(self, dp):
>>          assert dp.id is not None
>> @@ -67,7 +64,7 @@ class DPSet(object):
>>          if dp_type_ is not None:
>>              dp.dp_type = dp_type_
>>  
>> -        self.ev_q.queue(EventDP(dp, True))
>> +        self.send_event_to_observers(EventDP(dp, True))
>>          self.dps[dp.id] = dp
>>  
>>      def unregister(self, dp):
>> @@ -76,7 +73,7 @@ class DPSet(object):
>>              assert dp.id not in self.dp_types
>>              self.dp_types[dp.id] = getattr(dp, 'dp_type', dp_type.UNKNOWN)
>>  
>> -            self.ev_q.queue(EventDP(dp, False))
>> +            self.send_event_to_observers(EventDP(dp, False))
>>  
>>      def set_type(self, dp_id, dp_type_=dp_type.UNKNOWN):
>>          if dp_id in self.dps:
>> @@ -92,19 +89,13 @@ class DPSet(object):
>>      def get_all(self):
>>          return self.dps.items()
>>  
>> -    @set_ev_cls(dispatcher.EventDispatcherChange,
>> -                dispatcher.QUEUE_EV_DISPATCHER)
>> +    @set_ev_cls(ofp_event.EventOFPStateChange, handler.MAIN_DISPATCHER)
> 
> Probably this needs to contain both MAIN_DISPTACHER and DEAD_DISPATCHER

Oops, fixed.

btw, I plan to rename *_DISPATCHER names, variable types, etc since it
just represents OF dp states.


>>      def dispacher_change(self, ev):
>> -        LOG.debug('dispatcher change q %s dispatcher %s',
>> -                  ev.ev_q.name, ev.new_dispatcher.name)
>> -        if ev.ev_q.name != handler.QUEUE_NAME_OFP_MSG:
>> -            return
>> -
>> -        datapath = ev.ev_q.aux
>> +        datapath = ev.datapath
>>          assert datapath is not None
>> -        if ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_MAIN:
>> +        if ev.state == handler.MAIN_DISPATCHER:
>>              LOG.debug('DPSET: register datapath %s', datapath)
>>              self.register(datapath)
>> -        elif ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_DEAD:
>> +        elif ev.state == handler.DEAD_DISPATCHER:
>>              LOG.debug('DPSET: unregister datapath %s', datapath)
>>              self.unregister(datapath)
>> diff --git a/ryu/controller/handler.py b/ryu/controller/handler.py
>> index ede27df..91c448d 100644
>> --- a/ryu/controller/handler.py
>> +++ b/ryu/controller/handler.py
>> @@ -14,36 +14,39 @@
>>  # See the License for the specific language governing permissions and
>>  # limitations under the License.
>>  
>> -import copy
>>  import inspect
>>  import logging
>>  
>> -from ryu.controller import dispatcher
>>  from ryu.controller import ofp_event
>>  
>>  LOG = logging.getLogger('ryu.controller.handler')
>>  
>> -QUEUE_NAME_OFP_MSG = 'ofp_msg'
>> -DISPATCHER_NAME_OFP_HANDSHAKE = 'ofp_handshake'
>> -HANDSHAKE_DISPATCHER = dispatcher.EventDispatcher(
>> -    DISPATCHER_NAME_OFP_HANDSHAKE)
>> -DISPATCHER_NAME_OFP_CONFIG = 'ofp_config'
>> -CONFIG_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_CONFIG)
>> -DISPATCHER_NAME_OFP_MAIN = 'ofp_main'
>> -MAIN_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_MAIN)
>> -DISPATCHER_NAME_OFP_DEAD = 'ofp_dead'
>> -DEAD_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_DEAD)
>> +# just represent OF datapath state. datapath specific so should be moved.
>> +HANDSHAKE_DISPATCHER = "handshake"
>> +CONFIG_DISPATCHER = "config"
>> +MAIN_DISPATCHER = "main"
>> +DEAD_DISPATCHER = "dead"
>>  
>>  
>> +# should be named something like 'observe_event'
>>  def set_ev_cls(ev_cls, dispatchers):
>>      def _set_ev_cls_dec(handler):
>>          handler.ev_cls = ev_cls
>> -        handler.dispatchers = dispatchers
>> +        handler.dispatchers = _listify(dispatchers)
>> +        handler.observer = ev_cls.__module__
>>          return handler
>>      return _set_ev_cls_dec
>>  
>>  
>> -def _is_ev_handler(meth):
>> +def set_ev_handler(ev_cls, dispatchers):
>> +    def _set_ev_cls_dec(handler):
>> +        handler.ev_cls = ev_cls
>> +        handler.dispatchers = _listify(dispatchers)
>> +        return handler
>> +    return _set_ev_cls_dec
>> +
>> +
>> +def _is_ev_cls(meth):
>>      return hasattr(meth, 'ev_cls')
>>  
>>  
>> @@ -58,12 +61,6 @@ def _listify(may_list):
>>  def register_instance(i):
>>      for _k, m in inspect.getmembers(i, inspect.ismethod):
>>          # LOG.debug('instance %s k %s m %s', i, _k, m)
>> -        if not _is_ev_handler(m):
>> -            continue
>> -
>> -        _dispatchers = _listify(getattr(m, 'dispatchers', None))
>> -        # LOG.debug("_dispatchers %s", _dispatchers)
>> -        for d in _dispatchers:
>> -            # LOG.debug('register dispatcher %s ev %s k %s m %s',
>> -            #           d.name, m.ev_cls, _k, m)
>> -            d.register_handler(m.ev_cls, m)
>> +        if _is_ev_cls(m):
>> +            _dispatchers = _listify(getattr(m, 'dispatchers', None))
> 
> _dispatchers isn't used

Fixed.


Thanks,

------------------------------------------------------------------------------
Everyone hates slow websites. So do we.
Make your web apps faster with AppDynamics
Download AppDynamics Lite for free today:
http://p.sf.net/sfu/appdyn_d2d_jan
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to