I'd like to discuss its design before going into reviewing coding detail.

Can we split event source/sink and event loop from ryuapp?
So single ryuapp can define multiple event sources. And more flexible
event source/sink connection is possible. Actually (unmerged)
gre_tunnel app defines multiple event source.

Introducing register_handler/observer to RyuApp will introduce
dependency between RyuApps. It means that when loading RyuApp dynamically,
loading order will be important. Although I think "bricks" is your answer to
app-dependency, how about something like hub?
publisher -> hub -> subscriber

State change (set_state) doesn't seem work.
dpset.DPSet.dispatcher_change() isn't called when the dispatcher is changed
to dead. Since catching state change in this way is error-prone, it would
be safer to introduce state change event.

event_dumper will be broken. It would be necessary to list all event source
and to subscribe all event.

thanks,

On Mon, Jan 21, 2013 at 12:31:28PM +0900, FUJITA Tomonori wrote:
> This is purely internal change and no API for applications is
> changed. At least, I confirmed that folsom OpenStack plugin works.
> 
> With the current dispatcher mechanism, multiple greenlets call
> applications' handlers and might be blocked anywhere so we need
> various locks to handle that concurrency. This makes things difficult
> for application developers.
> 
> With this patch, each software components are connected with events.
> Each component has the own greenlet(s) to handle events and might send
> events to other components.
> 
> For the compatibility, currently RyuApp class instances are converted
> into the above components. If an application registers handlers for
> some OF events, it subscribes to OF component and registers the OF
> events that it's interested. OF component delivers such OF events to
> the application and the application's greenlet executes the handlers.
> 
> With this, we can completely remove dispatcher.py and its friends.
> 
> Signed-off-by: FUJITA Tomonori <[email protected]>
> ---
>  ryu/base/app_manager.py       |   75 +++++++++++++++++++++++++++++++++-------
>  ryu/controller/controller.py  |   36 ++++++++++----------
>  ryu/controller/dpset.py       |   31 ++++++-----------
>  ryu/controller/handler.py     |   43 +++++++++++------------
>  ryu/controller/ofp_event.py   |    6 +++
>  ryu/controller/ofp_handler.py |   21 ++++++-----
>  6 files changed, 129 insertions(+), 83 deletions(-)
> 
> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
> index 4a29c95..eedae11 100644
> --- a/ryu/base/app_manager.py
> +++ b/ryu/base/app_manager.py
> @@ -17,6 +17,9 @@
>  import inspect
>  import itertools
>  import logging
> +import gevent
> +
> +from gevent.queue import Queue
>  
>  from ryu import utils
>  from ryu.controller.handler import register_instance
> @@ -24,20 +27,13 @@ from ryu.controller.controller import Datapath
>  
>  LOG = logging.getLogger('ryu.base.app_manager')
>  
> +SERVICE_BRICKS = {}
>  
> -class RyuAppContext(object):
> -    """
> -    Base class for Ryu application context
> -    """
> -    def __init__(self):
> -        super(RyuAppContext, self).__init__()
>  
> -    def close(self):
> -        """
> -        teardown method
> -        The method name, close, is chosen for python context manager
> -        """
> -        pass
> +def lookup_service_brick(name):
> +    if name in SERVICE_BRICKS:
> +        return SERVICE_BRICKS[name]
> +    return None
>  
>  
>  class RyuApp(object):
> @@ -55,6 +51,45 @@ class RyuApp(object):
>  
>      def __init__(self, *_args, **_kwargs):
>          super(RyuApp, self).__init__()
> +        self.name = self.__class__.__name__
> +        self.event_handlers = {}
> +        self.observers = {}
> +        self.threads = []
> +        self.events = Queue()
> +        self.threads.append(gevent.spawn(self._event_loop))
> +
> +    def register_handler(self, ev_cls, handler):
> +        assert callable(handler)
> +        self.event_handlers.setdefault(ev_cls, [])
> +        self.event_handlers[ev_cls].append(handler)
> +
> +    def register_observer(self, ev_cls, name):
> +        self.observers.setdefault(ev_cls, [])
> +        self.observers[ev_cls].append(name)
> +
> +    def get_handlers(self, ev):
> +        return self.event_handlers.get(ev.__class__, [])
> +
> +    def get_observers(self, ev):
> +        return self.observers.get(ev.__class__, [])
> +
> +    def _event_loop(self):
> +        while True:
> +            ev = self.events.get()
> +            handlers = self.get_handlers(ev)
> +            for handler in handlers:
> +                handler(ev)
> +
> +    def _send_event(self, ev):
> +        self.events.put(ev)
> +
> +    def send_event(self, name, ev):
> +        if name in SERVICE_BRICKS:
> +            SERVICE_BRICKS[name]._send_event(ev)
> +
> +    def send_event_to_observers(self, ev):
> +        for observer in self.get_observers(ev):
> +            self.send_event(observer, ev)
>  
>      def close(self):
>          """
> @@ -102,7 +137,12 @@ class AppManager(object):
>  
>      def create_contexts(self):
>          for key, cls in self.contexts_cls.items():
> -            self.contexts[key] = cls()
> +            context = cls()
> +            self.contexts[key] = context
> +            # hack for dpset
> +            if context.__class__.__base__ == RyuApp:
> +                SERVICE_BRICKS[context.name] = context
> +                register_instance(context)
>          return self.contexts
>  
>      def instantiate_apps(self, *args, **kwargs):
> @@ -124,6 +164,15 @@ class AppManager(object):
>              app = cls(*args, **kwargs)
>              register_instance(app)
>              self.applications[app_name] = app
> +            SERVICE_BRICKS[app.name] = app
> +
> +        for key, i in SERVICE_BRICKS.items():
> +            for _k, m in inspect.getmembers(i, inspect.ismethod):
> +                if hasattr(m, 'observer'):
> +                    name = m.observer.split('.')[-1]
> +                    if name in SERVICE_BRICKS:
> +                        brick = SERVICE_BRICKS[name]
> +                        brick.register_observer(m.ev_cls, i.name)
>  
>      def close(self):
>          def close_all(close_dict):
> diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
> index 1811d8c..7c33e4a 100644
> --- a/ryu/controller/controller.py
> +++ b/ryu/controller/controller.py
> @@ -25,6 +25,8 @@ import ssl
>  from gevent.server import StreamServer
>  from gevent.queue import Queue
>  
> +import ryu.base.app_manager
> +
>  from ryu.ofproto import ofproto_common
>  from ryu.ofproto import ofproto_parser
>  from ryu.ofproto import ofproto_v1_0
> @@ -35,7 +37,6 @@ from ryu.ofproto import ofproto_v1_3
>  from ryu.ofproto import ofproto_v1_3_parser
>  from ryu.ofproto import nx_match
>  
> -from ryu.controller import dispatcher
>  from ryu.controller import handler
>  from ryu.controller import ofp_event
>  
> @@ -123,25 +124,22 @@ class Datapath(object):
>          # prevent it from eating memory up
>          self.send_q = Queue(16)
>  
> -        # circular reference self.ev_q.aux == self
> -        self.ev_q = dispatcher.EventQueue(handler.QUEUE_NAME_OFP_MSG,
> -                                          handler.HANDSHAKE_DISPATCHER,
> -                                          self)
> -
>          self.set_version(max(self.supported_ofp_version))
>          self.xid = random.randint(0, self.ofproto.MAX_XID)
>          self.id = None  # datapath_id is unknown yet
>          self.ports = None
>          self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
> +        self.ofp_brick = 
> ryu.base.app_manager.lookup_service_brick('ofp_event')
> +        self.set_state(handler.HANDSHAKE_DISPATCHER)
>  
>      def close(self):
> -        """
> -        Call this before discarding this datapath object
> -        The circular refernce as self.ev_q.aux == self must be broken.
> -        """
> -        # tell this datapath is dead
> -        self.ev_q.set_dispatcher(handler.DEAD_DISPATCHER)
> -        self.ev_q.close()
> +        self.set_state(handler.DEAD_DISPATCHER)
> +
> +    def set_state(self, state):
> +        self.state = state
> +        ev = ofp_event.EventOFPStateChange(self)
> +        ev.state = state
> +        self.ofp_brick.send_event_to_observers(ev)
>  
>      def set_version(self, version):
>          assert version in self.supported_ofp_version
> @@ -169,7 +167,13 @@ class Datapath(object):
>                  msg = ofproto_parser.msg(self,
>                                           version, msg_type, msg_len, xid, 
> buf)
>                  #LOG.debug('queue msg %s cls %s', msg, msg.__class__)
> -                self.ev_q.queue(ofp_event.ofp_msg_to_ev(msg))
> +                ev = ofp_event.ofp_msg_to_ev(msg)
> +                handlers = self.ofp_brick.get_handlers(ev)
> +                for handler in handlers:
> +                    if self.state in handler.dispatchers:
> +                        handler(ev)
> +
> +                self.ofp_brick.send_event_to_observers(ev)
>  
>                  buf = buf[required_len:]
>                  required_len = ofproto_common.OFP_HEADER_SIZE
> @@ -219,10 +223,6 @@ class Datapath(object):
>              gevent.kill(send_thr)
>              gevent.joinall([send_thr])
>  
> -    def send_ev(self, ev):
> -        #LOG.debug('send_ev %s', ev)
> -        self.ev_q.queue(ev)
> -
>      #
>      # Utility methods for convenience
>      #
> diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py
> index 80ffb6f..dccca27 100644
> --- a/ryu/controller/dpset.py
> +++ b/ryu/controller/dpset.py
> @@ -16,8 +16,9 @@
>  
>  import logging
>  
> +from ryu.base import app_manager
>  from ryu.controller import event
> -from ryu.controller import dispatcher
> +from ryu.controller import ofp_event
>  from ryu.controller import dp_type
>  from ryu.controller import handler
>  from ryu.controller.handler import set_ev_cls
> @@ -25,9 +26,7 @@ from ryu.controller.handler import set_ev_cls
>  LOG = logging.getLogger('ryu.controller.dpset')
>  
>  
> -QUEUE_NAME_DPSET = 'dpset'
> -DISPATCHER_NAME_DPSET = 'dpset'
> -DPSET_EV_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_DPSET)
> +DPSET_EV_DISPATCHER = 'dpset'
>  
>  
>  class EventDPBase(event.EventBase):
> @@ -46,18 +45,16 @@ class EventDP(EventDPBase):
>  
>  
>  # this depends on controller::Datapath and dispatchers in handler
> -class DPSet(object):
> +class DPSet(app_manager.RyuApp):
>      def __init__(self):
>          super(DPSet, self).__init__()
> +        self.name = 'dpset'
>  
>          # dp registration and type setting can be occur in any order
>          # Sometimes the sw_type is set before dp connection
>          self.dp_types = {}
>  
>          self.dps = {}   # datapath_id => class Datapath
> -        self.ev_q = dispatcher.EventQueue(QUEUE_NAME_DPSET,
> -                                          DPSET_EV_DISPATCHER)
> -        handler.register_instance(self)
>  
>      def register(self, dp):
>          assert dp.id is not None
> @@ -67,7 +64,7 @@ class DPSet(object):
>          if dp_type_ is not None:
>              dp.dp_type = dp_type_
>  
> -        self.ev_q.queue(EventDP(dp, True))
> +        self.send_event_to_observers(EventDP(dp, True))
>          self.dps[dp.id] = dp
>  
>      def unregister(self, dp):
> @@ -76,7 +73,7 @@ class DPSet(object):
>              assert dp.id not in self.dp_types
>              self.dp_types[dp.id] = getattr(dp, 'dp_type', dp_type.UNKNOWN)
>  
> -            self.ev_q.queue(EventDP(dp, False))
> +            self.send_event_to_observers(EventDP(dp, False))
>  
>      def set_type(self, dp_id, dp_type_=dp_type.UNKNOWN):
>          if dp_id in self.dps:
> @@ -92,19 +89,13 @@ class DPSet(object):
>      def get_all(self):
>          return self.dps.items()
>  
> -    @set_ev_cls(dispatcher.EventDispatcherChange,
> -                dispatcher.QUEUE_EV_DISPATCHER)
> +    @set_ev_cls(ofp_event.EventOFPStateChange, handler.MAIN_DISPATCHER)
>      def dispacher_change(self, ev):
> -        LOG.debug('dispatcher change q %s dispatcher %s',
> -                  ev.ev_q.name, ev.new_dispatcher.name)
> -        if ev.ev_q.name != handler.QUEUE_NAME_OFP_MSG:
> -            return
> -
> -        datapath = ev.ev_q.aux
> +        datapath = ev.datapath
>          assert datapath is not None
> -        if ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_MAIN:
> +        if ev.state == handler.MAIN_DISPATCHER:
>              LOG.debug('DPSET: register datapath %s', datapath)
>              self.register(datapath)
> -        elif ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_DEAD:
> +        elif ev.state == handler.DEAD_DISPATCHER:
>              LOG.debug('DPSET: unregister datapath %s', datapath)
>              self.unregister(datapath)
> diff --git a/ryu/controller/handler.py b/ryu/controller/handler.py
> index ede27df..91c448d 100644
> --- a/ryu/controller/handler.py
> +++ b/ryu/controller/handler.py
> @@ -14,36 +14,39 @@
>  # See the License for the specific language governing permissions and
>  # limitations under the License.
>  
> -import copy
>  import inspect
>  import logging
>  
> -from ryu.controller import dispatcher
>  from ryu.controller import ofp_event
>  
>  LOG = logging.getLogger('ryu.controller.handler')
>  
> -QUEUE_NAME_OFP_MSG = 'ofp_msg'
> -DISPATCHER_NAME_OFP_HANDSHAKE = 'ofp_handshake'
> -HANDSHAKE_DISPATCHER = dispatcher.EventDispatcher(
> -    DISPATCHER_NAME_OFP_HANDSHAKE)
> -DISPATCHER_NAME_OFP_CONFIG = 'ofp_config'
> -CONFIG_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_CONFIG)
> -DISPATCHER_NAME_OFP_MAIN = 'ofp_main'
> -MAIN_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_MAIN)
> -DISPATCHER_NAME_OFP_DEAD = 'ofp_dead'
> -DEAD_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_DEAD)
> +# just represent OF datapath state. datapath specific so should be moved.
> +HANDSHAKE_DISPATCHER = "handshake"
> +CONFIG_DISPATCHER = "config"
> +MAIN_DISPATCHER = "main"
> +DEAD_DISPATCHER = "dead"
>  
>  
> +# should be named something like 'observe_event'
>  def set_ev_cls(ev_cls, dispatchers):
>      def _set_ev_cls_dec(handler):
>          handler.ev_cls = ev_cls
> -        handler.dispatchers = dispatchers
> +        handler.dispatchers = _listify(dispatchers)
> +        handler.observer = ev_cls.__module__
>          return handler
>      return _set_ev_cls_dec
>  
>  
> -def _is_ev_handler(meth):
> +def set_ev_handler(ev_cls, dispatchers):
> +    def _set_ev_cls_dec(handler):
> +        handler.ev_cls = ev_cls
> +        handler.dispatchers = _listify(dispatchers)
> +        return handler
> +    return _set_ev_cls_dec
> +
> +
> +def _is_ev_cls(meth):
>      return hasattr(meth, 'ev_cls')
>  
>  
> @@ -58,12 +61,6 @@ def _listify(may_list):
>  def register_instance(i):
>      for _k, m in inspect.getmembers(i, inspect.ismethod):
>          # LOG.debug('instance %s k %s m %s', i, _k, m)
> -        if not _is_ev_handler(m):
> -            continue
> -
> -        _dispatchers = _listify(getattr(m, 'dispatchers', None))
> -        # LOG.debug("_dispatchers %s", _dispatchers)
> -        for d in _dispatchers:
> -            # LOG.debug('register dispatcher %s ev %s k %s m %s',
> -            #           d.name, m.ev_cls, _k, m)
> -            d.register_handler(m.ev_cls, m)
> +        if _is_ev_cls(m):
> +            _dispatchers = _listify(getattr(m, 'dispatchers', None))
> +            i.register_handler(m.ev_cls, m)
> diff --git a/ryu/controller/ofp_event.py b/ryu/controller/ofp_event.py
> index e5becac..9114604 100644
> --- a/ryu/controller/ofp_event.py
> +++ b/ryu/controller/ofp_event.py
> @@ -73,3 +73,9 @@ _PARSER_MODULE_LIST = ['ryu.ofproto.ofproto_v1_0_parser',
>  for m in _PARSER_MODULE_LIST:
>      # print 'loading module %s' % m
>      _create_ofp_msg_ev_from_module(m)
> +
> +
> +class EventOFPStateChange(event.EventBase):
> +    def __init__(self, dp):
> +        super(EventOFPStateChange, self).__init__()
> +        self.datapath = dp
> diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
> index add0301..b01f5e1 100644
> --- a/ryu/controller/ofp_handler.py
> +++ b/ryu/controller/ofp_handler.py
> @@ -16,10 +16,12 @@
>  
>  import logging
>  
> +import ryu.base.app_manager
> +
>  from ryu import utils
> -from ryu.base import app_manager
> +from ryu.controller import handler
>  from ryu.controller import ofp_event
> -from ryu.controller.handler import set_ev_cls
> +from ryu.controller.handler import set_ev_cls, set_ev_handler
>  from ryu.controller.handler import HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER,\
>      MAIN_DISPATCHER
>  
> @@ -39,9 +41,10 @@ LOG = logging.getLogger('ryu.controller.ofp_handler')
>  # back Echo Reply message.
>  
>  
> -class OFPHandler(app_manager.RyuApp):
> +class OFPHandler(ryu.base.app_manager.RyuApp):
>      def __init__(self, *args, **kwargs):
>          super(OFPHandler, self).__init__(*args, **kwargs)
> +        self.name = 'ofp_event'
>  
>      @staticmethod
>      def hello_failed(datapath, error_desc):
> @@ -52,7 +55,7 @@ class OFPHandler(app_manager.RyuApp):
>          error_msg.data = error_desc
>          datapath.send_msg(error_msg)
>  
> -    @set_ev_cls(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER)
> +    @set_ev_handler(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER)
>      def hello_handler(self, ev):
>          LOG.debug('hello ev %s', ev)
>          msg = ev.msg
> @@ -121,9 +124,9 @@ class OFPHandler(app_manager.RyuApp):
>  
>          # now move on to config state
>          LOG.debug('move onto config mode')
> -        datapath.ev_q.set_dispatcher(CONFIG_DISPATCHER)
> +        datapath.set_state(CONFIG_DISPATCHER)
>  
> -    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
> +    @set_ev_handler(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
>      def switch_features_handler(self, ev):
>          msg = ev.msg
>          datapath = msg.datapath
> @@ -147,9 +150,9 @@ class OFPHandler(app_manager.RyuApp):
>          datapath.send_msg(set_config)
>  
>          LOG.debug('move onto main mode')
> -        ev.msg.datapath.ev_q.set_dispatcher(MAIN_DISPATCHER)
> +        ev.msg.datapath.set_state(MAIN_DISPATCHER)
>  
> -    @set_ev_cls(ofp_event.EventOFPEchoRequest,
> +    @set_ev_handler(ofp_event.EventOFPEchoRequest,
>                  [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
>      def echo_request_handler(self, ev):
>          msg = ev.msg
> @@ -159,7 +162,7 @@ class OFPHandler(app_manager.RyuApp):
>          echo_reply.data = msg.data
>          datapath.send_msg(echo_reply)
>  
> -    @set_ev_cls(ofp_event.EventOFPErrorMsg,
> +    @set_ev_handler(ofp_event.EventOFPErrorMsg,
>                  [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
>      def error_msg_handler(self, ev):
>          msg = ev.msg
> -- 
> 1.7.4.4
> 
> 
> ------------------------------------------------------------------------------
> Master Visual Studio, SharePoint, SQL, ASP.NET, C# 2012, HTML5, CSS,
> MVC, Windows 8 Apps, JavaScript and much more. Keep your skills current
> with LearnDevNow - 3,200 step-by-step video tutorials by Microsoft
> MVPs and experts. SALE $99.99 this month only -- learn more at:
> http://p.sf.net/sfu/learnmore_122412
> _______________________________________________
> Ryu-devel mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/ryu-devel
> 

-- 
yamahata

------------------------------------------------------------------------------
Master SQL Server Development, Administration, T-SQL, SSAS, SSIS, SSRS
and more. Get SQL Server skills now (including 2012) with LearnDevNow -
200+ hours of step-by-step video tutorials by Microsoft MVPs and experts.
SALE $99.99 this month only - learn more at:
http://p.sf.net/sfu/learnmore_122512
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to