On Mon, Jan 21, 2013 at 12:31:28PM +0900, FUJITA Tomonori wrote:
> This is purely internal change and no API for applications is
> changed. At least, I confirmed that folsom OpenStack plugin works.
>
> With the current dispatcher mechanism, multiple greenlets call
> applications' handlers and might be blocked anywhere so we need
> various locks to handle that concurrency. This makes things difficult
> for application developers.
>
> With this patch, each software components are connected with events.
> Each component has the own greenlet(s) to handle events and might send
> events to other components.
>
> For the compatibility, currently RyuApp class instances are converted
> into the above components. If an application registers handlers for
> some OF events, it subscribes to OF component and registers the OF
> events that it's interested. OF component delivers such OF events to
> the application and the application's greenlet executes the handlers.
>
> With this, we can completely remove dispatcher.py and its friends.
>
> Signed-off-by: FUJITA Tomonori <[email protected]>
> ---
> ryu/base/app_manager.py | 75 +++++++++++++++++++++++++++++++++-------
> ryu/controller/controller.py | 36 ++++++++++----------
> ryu/controller/dpset.py | 31 ++++++-----------
> ryu/controller/handler.py | 43 +++++++++++------------
> ryu/controller/ofp_event.py | 6 +++
> ryu/controller/ofp_handler.py | 21 ++++++-----
> 6 files changed, 129 insertions(+), 83 deletions(-)
>
> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
> index 4a29c95..eedae11 100644
> --- a/ryu/base/app_manager.py
> +++ b/ryu/base/app_manager.py
> @@ -17,6 +17,9 @@
> import inspect
> import itertools
> import logging
> +import gevent
> +
> +from gevent.queue import Queue
>
> from ryu import utils
> from ryu.controller.handler import register_instance
> @@ -24,20 +27,13 @@ from ryu.controller.controller import Datapath
>
> LOG = logging.getLogger('ryu.base.app_manager')
>
> +SERVICE_BRICKS = {}
>
> -class RyuAppContext(object):
> - """
> - Base class for Ryu application context
> - """
> - def __init__(self):
> - super(RyuAppContext, self).__init__()
>
> - def close(self):
> - """
> - teardown method
> - The method name, close, is chosen for python context manager
> - """
> - pass
> +def lookup_service_brick(name):
> + if name in SERVICE_BRICKS:
> + return SERVICE_BRICKS[name]
> + return None
return SERVICE_BRIDGE.get(name)
>
>
> class RyuApp(object):
> @@ -55,6 +51,45 @@ class RyuApp(object):
>
> def __init__(self, *_args, **_kwargs):
> super(RyuApp, self).__init__()
> + self.name = self.__class__.__name__
> + self.event_handlers = {}
> + self.observers = {}
> + self.threads = []
> + self.events = Queue()
> + self.threads.append(gevent.spawn(self._event_loop))
> +
> + def register_handler(self, ev_cls, handler):
> + assert callable(handler)
> + self.event_handlers.setdefault(ev_cls, [])
> + self.event_handlers[ev_cls].append(handler)
> +
> + def register_observer(self, ev_cls, name):
> + self.observers.setdefault(ev_cls, [])
> + self.observers[ev_cls].append(name)
> +
> + def get_handlers(self, ev):
> + return self.event_handlers.get(ev.__class__, [])
> +
> + def get_observers(self, ev):
> + return self.observers.get(ev.__class__, [])
> +
> + def _event_loop(self):
> + while True:
> + ev = self.events.get()
> + handlers = self.get_handlers(ev)
> + for handler in handlers:
> + handler(ev)
> +
> + def _send_event(self, ev):
> + self.events.put(ev)
> +
> + def send_event(self, name, ev):
> + if name in SERVICE_BRICKS:
> + SERVICE_BRICKS[name]._send_event(ev)
> +
> + def send_event_to_observers(self, ev):
> + for observer in self.get_observers(ev):
> + self.send_event(observer, ev)
>
> def close(self):
> """
> @@ -102,7 +137,12 @@ class AppManager(object):
>
> def create_contexts(self):
> for key, cls in self.contexts_cls.items():
> - self.contexts[key] = cls()
> + context = cls()
> + self.contexts[key] = context
> + # hack for dpset
> + if context.__class__.__base__ == RyuApp:
> + SERVICE_BRICKS[context.name] = context
> + register_instance(context)
> return self.contexts
>
> def instantiate_apps(self, *args, **kwargs):
> @@ -124,6 +164,15 @@ class AppManager(object):
> app = cls(*args, **kwargs)
> register_instance(app)
> self.applications[app_name] = app
> + SERVICE_BRICKS[app.name] = app
> +
> + for key, i in SERVICE_BRICKS.items():
> + for _k, m in inspect.getmembers(i, inspect.ismethod):
> + if hasattr(m, 'observer'):
> + name = m.observer.split('.')[-1]
> + if name in SERVICE_BRICKS:
> + brick = SERVICE_BRICKS[name]
> + brick.register_observer(m.ev_cls, i.name)
>
> def close(self):
> def close_all(close_dict):
> diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
> index 1811d8c..7c33e4a 100644
> --- a/ryu/controller/controller.py
> +++ b/ryu/controller/controller.py
> @@ -25,6 +25,8 @@ import ssl
> from gevent.server import StreamServer
> from gevent.queue import Queue
>
> +import ryu.base.app_manager
> +
> from ryu.ofproto import ofproto_common
> from ryu.ofproto import ofproto_parser
> from ryu.ofproto import ofproto_v1_0
> @@ -35,7 +37,6 @@ from ryu.ofproto import ofproto_v1_3
> from ryu.ofproto import ofproto_v1_3_parser
> from ryu.ofproto import nx_match
>
> -from ryu.controller import dispatcher
> from ryu.controller import handler
> from ryu.controller import ofp_event
>
> @@ -123,25 +124,22 @@ class Datapath(object):
> # prevent it from eating memory up
> self.send_q = Queue(16)
>
> - # circular reference self.ev_q.aux == self
> - self.ev_q = dispatcher.EventQueue(handler.QUEUE_NAME_OFP_MSG,
> - handler.HANDSHAKE_DISPATCHER,
> - self)
> -
> self.set_version(max(self.supported_ofp_version))
> self.xid = random.randint(0, self.ofproto.MAX_XID)
> self.id = None # datapath_id is unknown yet
> self.ports = None
> self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
> + self.ofp_brick =
> ryu.base.app_manager.lookup_service_brick('ofp_event')
> + self.set_state(handler.HANDSHAKE_DISPATCHER)
>
> def close(self):
> - """
> - Call this before discarding this datapath object
> - The circular refernce as self.ev_q.aux == self must be broken.
> - """
> - # tell this datapath is dead
> - self.ev_q.set_dispatcher(handler.DEAD_DISPATCHER)
> - self.ev_q.close()
> + self.set_state(handler.DEAD_DISPATCHER)
> +
> + def set_state(self, state):
> + self.state = state
> + ev = ofp_event.EventOFPStateChange(self)
> + ev.state = state
> + self.ofp_brick.send_event_to_observers(ev)
>
> def set_version(self, version):
> assert version in self.supported_ofp_version
> @@ -169,7 +167,13 @@ class Datapath(object):
> msg = ofproto_parser.msg(self,
> version, msg_type, msg_len, xid,
> buf)
> #LOG.debug('queue msg %s cls %s', msg, msg.__class__)
> - self.ev_q.queue(ofp_event.ofp_msg_to_ev(msg))
> + ev = ofp_event.ofp_msg_to_ev(msg)
> + handlers = self.ofp_brick.get_handlers(ev)
> + for handler in handlers:
> + if self.state in handler.dispatchers:
> + handler(ev)
This is called every time event is sent. So this cost can be
paid at handler registration. event_handlers can be
self.event_handlers[ev.__class__][self.state] = list of handlers
> +
> + self.ofp_brick.send_event_to_observers(ev)
>
> buf = buf[required_len:]
> required_len = ofproto_common.OFP_HEADER_SIZE
> @@ -219,10 +223,6 @@ class Datapath(object):
> gevent.kill(send_thr)
> gevent.joinall([send_thr])
>
> - def send_ev(self, ev):
> - #LOG.debug('send_ev %s', ev)
> - self.ev_q.queue(ev)
> -
> #
> # Utility methods for convenience
> #
> diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py
> index 80ffb6f..dccca27 100644
> --- a/ryu/controller/dpset.py
> +++ b/ryu/controller/dpset.py
> @@ -16,8 +16,9 @@
>
> import logging
>
> +from ryu.base import app_manager
> from ryu.controller import event
> -from ryu.controller import dispatcher
> +from ryu.controller import ofp_event
> from ryu.controller import dp_type
> from ryu.controller import handler
> from ryu.controller.handler import set_ev_cls
> @@ -25,9 +26,7 @@ from ryu.controller.handler import set_ev_cls
> LOG = logging.getLogger('ryu.controller.dpset')
>
>
> -QUEUE_NAME_DPSET = 'dpset'
> -DISPATCHER_NAME_DPSET = 'dpset'
> -DPSET_EV_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_DPSET)
> +DPSET_EV_DISPATCHER = 'dpset'
>
>
> class EventDPBase(event.EventBase):
> @@ -46,18 +45,16 @@ class EventDP(EventDPBase):
>
>
> # this depends on controller::Datapath and dispatchers in handler
> -class DPSet(object):
> +class DPSet(app_manager.RyuApp):
> def __init__(self):
> super(DPSet, self).__init__()
> + self.name = 'dpset'
>
> # dp registration and type setting can be occur in any order
> # Sometimes the sw_type is set before dp connection
> self.dp_types = {}
>
> self.dps = {} # datapath_id => class Datapath
> - self.ev_q = dispatcher.EventQueue(QUEUE_NAME_DPSET,
> - DPSET_EV_DISPATCHER)
> - handler.register_instance(self)
>
> def register(self, dp):
> assert dp.id is not None
> @@ -67,7 +64,7 @@ class DPSet(object):
> if dp_type_ is not None:
> dp.dp_type = dp_type_
>
> - self.ev_q.queue(EventDP(dp, True))
> + self.send_event_to_observers(EventDP(dp, True))
> self.dps[dp.id] = dp
>
> def unregister(self, dp):
> @@ -76,7 +73,7 @@ class DPSet(object):
> assert dp.id not in self.dp_types
> self.dp_types[dp.id] = getattr(dp, 'dp_type', dp_type.UNKNOWN)
>
> - self.ev_q.queue(EventDP(dp, False))
> + self.send_event_to_observers(EventDP(dp, False))
>
> def set_type(self, dp_id, dp_type_=dp_type.UNKNOWN):
> if dp_id in self.dps:
> @@ -92,19 +89,13 @@ class DPSet(object):
> def get_all(self):
> return self.dps.items()
>
> - @set_ev_cls(dispatcher.EventDispatcherChange,
> - dispatcher.QUEUE_EV_DISPATCHER)
> + @set_ev_cls(ofp_event.EventOFPStateChange, handler.MAIN_DISPATCHER)
Probably this needs to contain both MAIN_DISPTACHER and DEAD_DISPATCHER
> def dispacher_change(self, ev):
> - LOG.debug('dispatcher change q %s dispatcher %s',
> - ev.ev_q.name, ev.new_dispatcher.name)
> - if ev.ev_q.name != handler.QUEUE_NAME_OFP_MSG:
> - return
> -
> - datapath = ev.ev_q.aux
> + datapath = ev.datapath
> assert datapath is not None
> - if ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_MAIN:
> + if ev.state == handler.MAIN_DISPATCHER:
> LOG.debug('DPSET: register datapath %s', datapath)
> self.register(datapath)
> - elif ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_DEAD:
> + elif ev.state == handler.DEAD_DISPATCHER:
> LOG.debug('DPSET: unregister datapath %s', datapath)
> self.unregister(datapath)
> diff --git a/ryu/controller/handler.py b/ryu/controller/handler.py
> index ede27df..91c448d 100644
> --- a/ryu/controller/handler.py
> +++ b/ryu/controller/handler.py
> @@ -14,36 +14,39 @@
> # See the License for the specific language governing permissions and
> # limitations under the License.
>
> -import copy
> import inspect
> import logging
>
> -from ryu.controller import dispatcher
> from ryu.controller import ofp_event
>
> LOG = logging.getLogger('ryu.controller.handler')
>
> -QUEUE_NAME_OFP_MSG = 'ofp_msg'
> -DISPATCHER_NAME_OFP_HANDSHAKE = 'ofp_handshake'
> -HANDSHAKE_DISPATCHER = dispatcher.EventDispatcher(
> - DISPATCHER_NAME_OFP_HANDSHAKE)
> -DISPATCHER_NAME_OFP_CONFIG = 'ofp_config'
> -CONFIG_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_CONFIG)
> -DISPATCHER_NAME_OFP_MAIN = 'ofp_main'
> -MAIN_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_MAIN)
> -DISPATCHER_NAME_OFP_DEAD = 'ofp_dead'
> -DEAD_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_DEAD)
> +# just represent OF datapath state. datapath specific so should be moved.
> +HANDSHAKE_DISPATCHER = "handshake"
> +CONFIG_DISPATCHER = "config"
> +MAIN_DISPATCHER = "main"
> +DEAD_DISPATCHER = "dead"
>
>
> +# should be named something like 'observe_event'
> def set_ev_cls(ev_cls, dispatchers):
> def _set_ev_cls_dec(handler):
> handler.ev_cls = ev_cls
> - handler.dispatchers = dispatchers
> + handler.dispatchers = _listify(dispatchers)
> + handler.observer = ev_cls.__module__
> return handler
> return _set_ev_cls_dec
>
>
> -def _is_ev_handler(meth):
> +def set_ev_handler(ev_cls, dispatchers):
> + def _set_ev_cls_dec(handler):
> + handler.ev_cls = ev_cls
> + handler.dispatchers = _listify(dispatchers)
> + return handler
> + return _set_ev_cls_dec
> +
> +
> +def _is_ev_cls(meth):
> return hasattr(meth, 'ev_cls')
>
>
> @@ -58,12 +61,6 @@ def _listify(may_list):
> def register_instance(i):
> for _k, m in inspect.getmembers(i, inspect.ismethod):
> # LOG.debug('instance %s k %s m %s', i, _k, m)
> - if not _is_ev_handler(m):
> - continue
> -
> - _dispatchers = _listify(getattr(m, 'dispatchers', None))
> - # LOG.debug("_dispatchers %s", _dispatchers)
> - for d in _dispatchers:
> - # LOG.debug('register dispatcher %s ev %s k %s m %s',
> - # d.name, m.ev_cls, _k, m)
> - d.register_handler(m.ev_cls, m)
> + if _is_ev_cls(m):
> + _dispatchers = _listify(getattr(m, 'dispatchers', None))
_dispatchers isn't used
> + i.register_handler(m.ev_cls, m)
> diff --git a/ryu/controller/ofp_event.py b/ryu/controller/ofp_event.py
> index e5becac..9114604 100644
> --- a/ryu/controller/ofp_event.py
> +++ b/ryu/controller/ofp_event.py
> @@ -73,3 +73,9 @@ _PARSER_MODULE_LIST = ['ryu.ofproto.ofproto_v1_0_parser',
> for m in _PARSER_MODULE_LIST:
> # print 'loading module %s' % m
> _create_ofp_msg_ev_from_module(m)
> +
> +
> +class EventOFPStateChange(event.EventBase):
> + def __init__(self, dp):
> + super(EventOFPStateChange, self).__init__()
> + self.datapath = dp
> diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
> index add0301..b01f5e1 100644
> --- a/ryu/controller/ofp_handler.py
> +++ b/ryu/controller/ofp_handler.py
> @@ -16,10 +16,12 @@
>
> import logging
>
> +import ryu.base.app_manager
> +
> from ryu import utils
> -from ryu.base import app_manager
> +from ryu.controller import handler
> from ryu.controller import ofp_event
> -from ryu.controller.handler import set_ev_cls
> +from ryu.controller.handler import set_ev_cls, set_ev_handler
> from ryu.controller.handler import HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER,\
> MAIN_DISPATCHER
>
> @@ -39,9 +41,10 @@ LOG = logging.getLogger('ryu.controller.ofp_handler')
> # back Echo Reply message.
>
>
> -class OFPHandler(app_manager.RyuApp):
> +class OFPHandler(ryu.base.app_manager.RyuApp):
> def __init__(self, *args, **kwargs):
> super(OFPHandler, self).__init__(*args, **kwargs)
> + self.name = 'ofp_event'
>
> @staticmethod
> def hello_failed(datapath, error_desc):
> @@ -52,7 +55,7 @@ class OFPHandler(app_manager.RyuApp):
> error_msg.data = error_desc
> datapath.send_msg(error_msg)
>
> - @set_ev_cls(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER)
> + @set_ev_handler(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER)
> def hello_handler(self, ev):
> LOG.debug('hello ev %s', ev)
> msg = ev.msg
> @@ -121,9 +124,9 @@ class OFPHandler(app_manager.RyuApp):
>
> # now move on to config state
> LOG.debug('move onto config mode')
> - datapath.ev_q.set_dispatcher(CONFIG_DISPATCHER)
> + datapath.set_state(CONFIG_DISPATCHER)
>
> - @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
> + @set_ev_handler(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
> def switch_features_handler(self, ev):
> msg = ev.msg
> datapath = msg.datapath
> @@ -147,9 +150,9 @@ class OFPHandler(app_manager.RyuApp):
> datapath.send_msg(set_config)
>
> LOG.debug('move onto main mode')
> - ev.msg.datapath.ev_q.set_dispatcher(MAIN_DISPATCHER)
> + ev.msg.datapath.set_state(MAIN_DISPATCHER)
>
> - @set_ev_cls(ofp_event.EventOFPEchoRequest,
> + @set_ev_handler(ofp_event.EventOFPEchoRequest,
> [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
> def echo_request_handler(self, ev):
> msg = ev.msg
> @@ -159,7 +162,7 @@ class OFPHandler(app_manager.RyuApp):
> echo_reply.data = msg.data
> datapath.send_msg(echo_reply)
>
> - @set_ev_cls(ofp_event.EventOFPErrorMsg,
> + @set_ev_handler(ofp_event.EventOFPErrorMsg,
> [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
> def error_msg_handler(self, ev):
> msg = ev.msg
> --
> 1.7.4.4
>
>
> ------------------------------------------------------------------------------
> Master Visual Studio, SharePoint, SQL, ASP.NET, C# 2012, HTML5, CSS,
> MVC, Windows 8 Apps, JavaScript and much more. Keep your skills current
> with LearnDevNow - 3,200 step-by-step video tutorials by Microsoft
> MVPs and experts. SALE $99.99 this month only -- learn more at:
> http://p.sf.net/sfu/learnmore_122412
> _______________________________________________
> Ryu-devel mailing list
> [email protected]
> https://lists.sourceforge.net/lists/listinfo/ryu-devel
>
--
yamahata
------------------------------------------------------------------------------
Master Visual Studio, SharePoint, SQL, ASP.NET, C# 2012, HTML5, CSS,
MVC, Windows 8 Apps, JavaScript and much more. Keep your skills current
with LearnDevNow - 3,200 step-by-step video tutorials by Microsoft
MVPs and experts. ON SALE this month only -- learn more at:
http://p.sf.net/sfu/learnnow-d2d
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel