I'd like to discuss its design before going into reviewing coding detail. Can we split event source/sink and event loop from ryuapp? So single ryuapp can define multiple event sources. And more flexible event source/sink connection is possible. Actually (unmerged) gre_tunnel app defines multiple event source.
Introducing register_handler/observer to RyuApp will introduce dependency between RyuApps. It means that when loading RyuApp dynamically, loading order will be important. Although I think "bricks" is your answer to app-dependency, how about something like hub? publisher -> hub -> subscriber State change (set_state) doesn't seem work. dpset.DPSet.dispatcher_change() isn't called when the dispatcher is changed to dead. Since catching state change in this way is error-prone, it would be safer to introduce state change event. event_dumper will be broken. It would be necessary to list all event source and to subscribe all event. thanks, On Mon, Jan 21, 2013 at 12:31:28PM +0900, FUJITA Tomonori wrote: > This is purely internal change and no API for applications is > changed. At least, I confirmed that folsom OpenStack plugin works. > > With the current dispatcher mechanism, multiple greenlets call > applications' handlers and might be blocked anywhere so we need > various locks to handle that concurrency. This makes things difficult > for application developers. > > With this patch, each software components are connected with events. > Each component has the own greenlet(s) to handle events and might send > events to other components. > > For the compatibility, currently RyuApp class instances are converted > into the above components. If an application registers handlers for > some OF events, it subscribes to OF component and registers the OF > events that it's interested. OF component delivers such OF events to > the application and the application's greenlet executes the handlers. > > With this, we can completely remove dispatcher.py and its friends. > > Signed-off-by: FUJITA Tomonori <[email protected]> > --- > ryu/base/app_manager.py | 75 +++++++++++++++++++++++++++++++++------- > ryu/controller/controller.py | 36 ++++++++++---------- > ryu/controller/dpset.py | 31 ++++++----------- > ryu/controller/handler.py | 43 +++++++++++------------ > ryu/controller/ofp_event.py | 6 +++ > ryu/controller/ofp_handler.py | 21 ++++++----- > 6 files changed, 129 insertions(+), 83 deletions(-) > > diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py > index 4a29c95..eedae11 100644 > --- a/ryu/base/app_manager.py > +++ b/ryu/base/app_manager.py > @@ -17,6 +17,9 @@ > import inspect > import itertools > import logging > +import gevent > + > +from gevent.queue import Queue > > from ryu import utils > from ryu.controller.handler import register_instance > @@ -24,20 +27,13 @@ from ryu.controller.controller import Datapath > > LOG = logging.getLogger('ryu.base.app_manager') > > +SERVICE_BRICKS = {} > > -class RyuAppContext(object): > - """ > - Base class for Ryu application context > - """ > - def __init__(self): > - super(RyuAppContext, self).__init__() > > - def close(self): > - """ > - teardown method > - The method name, close, is chosen for python context manager > - """ > - pass > +def lookup_service_brick(name): > + if name in SERVICE_BRICKS: > + return SERVICE_BRICKS[name] > + return None > > > class RyuApp(object): > @@ -55,6 +51,45 @@ class RyuApp(object): > > def __init__(self, *_args, **_kwargs): > super(RyuApp, self).__init__() > + self.name = self.__class__.__name__ > + self.event_handlers = {} > + self.observers = {} > + self.threads = [] > + self.events = Queue() > + self.threads.append(gevent.spawn(self._event_loop)) > + > + def register_handler(self, ev_cls, handler): > + assert callable(handler) > + self.event_handlers.setdefault(ev_cls, []) > + self.event_handlers[ev_cls].append(handler) > + > + def register_observer(self, ev_cls, name): > + self.observers.setdefault(ev_cls, []) > + self.observers[ev_cls].append(name) > + > + def get_handlers(self, ev): > + return self.event_handlers.get(ev.__class__, []) > + > + def get_observers(self, ev): > + return self.observers.get(ev.__class__, []) > + > + def _event_loop(self): > + while True: > + ev = self.events.get() > + handlers = self.get_handlers(ev) > + for handler in handlers: > + handler(ev) > + > + def _send_event(self, ev): > + self.events.put(ev) > + > + def send_event(self, name, ev): > + if name in SERVICE_BRICKS: > + SERVICE_BRICKS[name]._send_event(ev) > + > + def send_event_to_observers(self, ev): > + for observer in self.get_observers(ev): > + self.send_event(observer, ev) > > def close(self): > """ > @@ -102,7 +137,12 @@ class AppManager(object): > > def create_contexts(self): > for key, cls in self.contexts_cls.items(): > - self.contexts[key] = cls() > + context = cls() > + self.contexts[key] = context > + # hack for dpset > + if context.__class__.__base__ == RyuApp: > + SERVICE_BRICKS[context.name] = context > + register_instance(context) > return self.contexts > > def instantiate_apps(self, *args, **kwargs): > @@ -124,6 +164,15 @@ class AppManager(object): > app = cls(*args, **kwargs) > register_instance(app) > self.applications[app_name] = app > + SERVICE_BRICKS[app.name] = app > + > + for key, i in SERVICE_BRICKS.items(): > + for _k, m in inspect.getmembers(i, inspect.ismethod): > + if hasattr(m, 'observer'): > + name = m.observer.split('.')[-1] > + if name in SERVICE_BRICKS: > + brick = SERVICE_BRICKS[name] > + brick.register_observer(m.ev_cls, i.name) > > def close(self): > def close_all(close_dict): > diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py > index 1811d8c..7c33e4a 100644 > --- a/ryu/controller/controller.py > +++ b/ryu/controller/controller.py > @@ -25,6 +25,8 @@ import ssl > from gevent.server import StreamServer > from gevent.queue import Queue > > +import ryu.base.app_manager > + > from ryu.ofproto import ofproto_common > from ryu.ofproto import ofproto_parser > from ryu.ofproto import ofproto_v1_0 > @@ -35,7 +37,6 @@ from ryu.ofproto import ofproto_v1_3 > from ryu.ofproto import ofproto_v1_3_parser > from ryu.ofproto import nx_match > > -from ryu.controller import dispatcher > from ryu.controller import handler > from ryu.controller import ofp_event > > @@ -123,25 +124,22 @@ class Datapath(object): > # prevent it from eating memory up > self.send_q = Queue(16) > > - # circular reference self.ev_q.aux == self > - self.ev_q = dispatcher.EventQueue(handler.QUEUE_NAME_OFP_MSG, > - handler.HANDSHAKE_DISPATCHER, > - self) > - > self.set_version(max(self.supported_ofp_version)) > self.xid = random.randint(0, self.ofproto.MAX_XID) > self.id = None # datapath_id is unknown yet > self.ports = None > self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 > + self.ofp_brick = > ryu.base.app_manager.lookup_service_brick('ofp_event') > + self.set_state(handler.HANDSHAKE_DISPATCHER) > > def close(self): > - """ > - Call this before discarding this datapath object > - The circular refernce as self.ev_q.aux == self must be broken. > - """ > - # tell this datapath is dead > - self.ev_q.set_dispatcher(handler.DEAD_DISPATCHER) > - self.ev_q.close() > + self.set_state(handler.DEAD_DISPATCHER) > + > + def set_state(self, state): > + self.state = state > + ev = ofp_event.EventOFPStateChange(self) > + ev.state = state > + self.ofp_brick.send_event_to_observers(ev) > > def set_version(self, version): > assert version in self.supported_ofp_version > @@ -169,7 +167,13 @@ class Datapath(object): > msg = ofproto_parser.msg(self, > version, msg_type, msg_len, xid, > buf) > #LOG.debug('queue msg %s cls %s', msg, msg.__class__) > - self.ev_q.queue(ofp_event.ofp_msg_to_ev(msg)) > + ev = ofp_event.ofp_msg_to_ev(msg) > + handlers = self.ofp_brick.get_handlers(ev) > + for handler in handlers: > + if self.state in handler.dispatchers: > + handler(ev) > + > + self.ofp_brick.send_event_to_observers(ev) > > buf = buf[required_len:] > required_len = ofproto_common.OFP_HEADER_SIZE > @@ -219,10 +223,6 @@ class Datapath(object): > gevent.kill(send_thr) > gevent.joinall([send_thr]) > > - def send_ev(self, ev): > - #LOG.debug('send_ev %s', ev) > - self.ev_q.queue(ev) > - > # > # Utility methods for convenience > # > diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py > index 80ffb6f..dccca27 100644 > --- a/ryu/controller/dpset.py > +++ b/ryu/controller/dpset.py > @@ -16,8 +16,9 @@ > > import logging > > +from ryu.base import app_manager > from ryu.controller import event > -from ryu.controller import dispatcher > +from ryu.controller import ofp_event > from ryu.controller import dp_type > from ryu.controller import handler > from ryu.controller.handler import set_ev_cls > @@ -25,9 +26,7 @@ from ryu.controller.handler import set_ev_cls > LOG = logging.getLogger('ryu.controller.dpset') > > > -QUEUE_NAME_DPSET = 'dpset' > -DISPATCHER_NAME_DPSET = 'dpset' > -DPSET_EV_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_DPSET) > +DPSET_EV_DISPATCHER = 'dpset' > > > class EventDPBase(event.EventBase): > @@ -46,18 +45,16 @@ class EventDP(EventDPBase): > > > # this depends on controller::Datapath and dispatchers in handler > -class DPSet(object): > +class DPSet(app_manager.RyuApp): > def __init__(self): > super(DPSet, self).__init__() > + self.name = 'dpset' > > # dp registration and type setting can be occur in any order > # Sometimes the sw_type is set before dp connection > self.dp_types = {} > > self.dps = {} # datapath_id => class Datapath > - self.ev_q = dispatcher.EventQueue(QUEUE_NAME_DPSET, > - DPSET_EV_DISPATCHER) > - handler.register_instance(self) > > def register(self, dp): > assert dp.id is not None > @@ -67,7 +64,7 @@ class DPSet(object): > if dp_type_ is not None: > dp.dp_type = dp_type_ > > - self.ev_q.queue(EventDP(dp, True)) > + self.send_event_to_observers(EventDP(dp, True)) > self.dps[dp.id] = dp > > def unregister(self, dp): > @@ -76,7 +73,7 @@ class DPSet(object): > assert dp.id not in self.dp_types > self.dp_types[dp.id] = getattr(dp, 'dp_type', dp_type.UNKNOWN) > > - self.ev_q.queue(EventDP(dp, False)) > + self.send_event_to_observers(EventDP(dp, False)) > > def set_type(self, dp_id, dp_type_=dp_type.UNKNOWN): > if dp_id in self.dps: > @@ -92,19 +89,13 @@ class DPSet(object): > def get_all(self): > return self.dps.items() > > - @set_ev_cls(dispatcher.EventDispatcherChange, > - dispatcher.QUEUE_EV_DISPATCHER) > + @set_ev_cls(ofp_event.EventOFPStateChange, handler.MAIN_DISPATCHER) > def dispacher_change(self, ev): > - LOG.debug('dispatcher change q %s dispatcher %s', > - ev.ev_q.name, ev.new_dispatcher.name) > - if ev.ev_q.name != handler.QUEUE_NAME_OFP_MSG: > - return > - > - datapath = ev.ev_q.aux > + datapath = ev.datapath > assert datapath is not None > - if ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_MAIN: > + if ev.state == handler.MAIN_DISPATCHER: > LOG.debug('DPSET: register datapath %s', datapath) > self.register(datapath) > - elif ev.new_dispatcher.name == handler.DISPATCHER_NAME_OFP_DEAD: > + elif ev.state == handler.DEAD_DISPATCHER: > LOG.debug('DPSET: unregister datapath %s', datapath) > self.unregister(datapath) > diff --git a/ryu/controller/handler.py b/ryu/controller/handler.py > index ede27df..91c448d 100644 > --- a/ryu/controller/handler.py > +++ b/ryu/controller/handler.py > @@ -14,36 +14,39 @@ > # See the License for the specific language governing permissions and > # limitations under the License. > > -import copy > import inspect > import logging > > -from ryu.controller import dispatcher > from ryu.controller import ofp_event > > LOG = logging.getLogger('ryu.controller.handler') > > -QUEUE_NAME_OFP_MSG = 'ofp_msg' > -DISPATCHER_NAME_OFP_HANDSHAKE = 'ofp_handshake' > -HANDSHAKE_DISPATCHER = dispatcher.EventDispatcher( > - DISPATCHER_NAME_OFP_HANDSHAKE) > -DISPATCHER_NAME_OFP_CONFIG = 'ofp_config' > -CONFIG_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_CONFIG) > -DISPATCHER_NAME_OFP_MAIN = 'ofp_main' > -MAIN_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_MAIN) > -DISPATCHER_NAME_OFP_DEAD = 'ofp_dead' > -DEAD_DISPATCHER = dispatcher.EventDispatcher(DISPATCHER_NAME_OFP_DEAD) > +# just represent OF datapath state. datapath specific so should be moved. > +HANDSHAKE_DISPATCHER = "handshake" > +CONFIG_DISPATCHER = "config" > +MAIN_DISPATCHER = "main" > +DEAD_DISPATCHER = "dead" > > > +# should be named something like 'observe_event' > def set_ev_cls(ev_cls, dispatchers): > def _set_ev_cls_dec(handler): > handler.ev_cls = ev_cls > - handler.dispatchers = dispatchers > + handler.dispatchers = _listify(dispatchers) > + handler.observer = ev_cls.__module__ > return handler > return _set_ev_cls_dec > > > -def _is_ev_handler(meth): > +def set_ev_handler(ev_cls, dispatchers): > + def _set_ev_cls_dec(handler): > + handler.ev_cls = ev_cls > + handler.dispatchers = _listify(dispatchers) > + return handler > + return _set_ev_cls_dec > + > + > +def _is_ev_cls(meth): > return hasattr(meth, 'ev_cls') > > > @@ -58,12 +61,6 @@ def _listify(may_list): > def register_instance(i): > for _k, m in inspect.getmembers(i, inspect.ismethod): > # LOG.debug('instance %s k %s m %s', i, _k, m) > - if not _is_ev_handler(m): > - continue > - > - _dispatchers = _listify(getattr(m, 'dispatchers', None)) > - # LOG.debug("_dispatchers %s", _dispatchers) > - for d in _dispatchers: > - # LOG.debug('register dispatcher %s ev %s k %s m %s', > - # d.name, m.ev_cls, _k, m) > - d.register_handler(m.ev_cls, m) > + if _is_ev_cls(m): > + _dispatchers = _listify(getattr(m, 'dispatchers', None)) > + i.register_handler(m.ev_cls, m) > diff --git a/ryu/controller/ofp_event.py b/ryu/controller/ofp_event.py > index e5becac..9114604 100644 > --- a/ryu/controller/ofp_event.py > +++ b/ryu/controller/ofp_event.py > @@ -73,3 +73,9 @@ _PARSER_MODULE_LIST = ['ryu.ofproto.ofproto_v1_0_parser', > for m in _PARSER_MODULE_LIST: > # print 'loading module %s' % m > _create_ofp_msg_ev_from_module(m) > + > + > +class EventOFPStateChange(event.EventBase): > + def __init__(self, dp): > + super(EventOFPStateChange, self).__init__() > + self.datapath = dp > diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py > index add0301..b01f5e1 100644 > --- a/ryu/controller/ofp_handler.py > +++ b/ryu/controller/ofp_handler.py > @@ -16,10 +16,12 @@ > > import logging > > +import ryu.base.app_manager > + > from ryu import utils > -from ryu.base import app_manager > +from ryu.controller import handler > from ryu.controller import ofp_event > -from ryu.controller.handler import set_ev_cls > +from ryu.controller.handler import set_ev_cls, set_ev_handler > from ryu.controller.handler import HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER,\ > MAIN_DISPATCHER > > @@ -39,9 +41,10 @@ LOG = logging.getLogger('ryu.controller.ofp_handler') > # back Echo Reply message. > > > -class OFPHandler(app_manager.RyuApp): > +class OFPHandler(ryu.base.app_manager.RyuApp): > def __init__(self, *args, **kwargs): > super(OFPHandler, self).__init__(*args, **kwargs) > + self.name = 'ofp_event' > > @staticmethod > def hello_failed(datapath, error_desc): > @@ -52,7 +55,7 @@ class OFPHandler(app_manager.RyuApp): > error_msg.data = error_desc > datapath.send_msg(error_msg) > > - @set_ev_cls(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER) > + @set_ev_handler(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER) > def hello_handler(self, ev): > LOG.debug('hello ev %s', ev) > msg = ev.msg > @@ -121,9 +124,9 @@ class OFPHandler(app_manager.RyuApp): > > # now move on to config state > LOG.debug('move onto config mode') > - datapath.ev_q.set_dispatcher(CONFIG_DISPATCHER) > + datapath.set_state(CONFIG_DISPATCHER) > > - @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) > + @set_ev_handler(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) > def switch_features_handler(self, ev): > msg = ev.msg > datapath = msg.datapath > @@ -147,9 +150,9 @@ class OFPHandler(app_manager.RyuApp): > datapath.send_msg(set_config) > > LOG.debug('move onto main mode') > - ev.msg.datapath.ev_q.set_dispatcher(MAIN_DISPATCHER) > + ev.msg.datapath.set_state(MAIN_DISPATCHER) > > - @set_ev_cls(ofp_event.EventOFPEchoRequest, > + @set_ev_handler(ofp_event.EventOFPEchoRequest, > [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER]) > def echo_request_handler(self, ev): > msg = ev.msg > @@ -159,7 +162,7 @@ class OFPHandler(app_manager.RyuApp): > echo_reply.data = msg.data > datapath.send_msg(echo_reply) > > - @set_ev_cls(ofp_event.EventOFPErrorMsg, > + @set_ev_handler(ofp_event.EventOFPErrorMsg, > [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER]) > def error_msg_handler(self, ev): > msg = ev.msg > -- > 1.7.4.4 > > > ------------------------------------------------------------------------------ > Master Visual Studio, SharePoint, SQL, ASP.NET, C# 2012, HTML5, CSS, > MVC, Windows 8 Apps, JavaScript and much more. Keep your skills current > with LearnDevNow - 3,200 step-by-step video tutorials by Microsoft > MVPs and experts. SALE $99.99 this month only -- learn more at: > http://p.sf.net/sfu/learnmore_122412 > _______________________________________________ > Ryu-devel mailing list > [email protected] > https://lists.sourceforge.net/lists/listinfo/ryu-devel > -- yamahata ------------------------------------------------------------------------------ Master SQL Server Development, Administration, T-SQL, SSAS, SSIS, SSRS and more. Get SQL Server skills now (including 2012) with LearnDevNow - 200+ hours of step-by-step video tutorials by Microsoft MVPs and experts. SALE $99.99 this month only - learn more at: http://p.sf.net/sfu/learnmore_122512 _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
