now ryu bgp can send internal information through BGP monitoring protocol

Signed-off-by: ISHIDA Wataru <[email protected]>
---
 ryu/services/protocols/bgp/api/rtconf.py   |   16 +++
 ryu/services/protocols/bgp/bgpspeaker.py   |   14 ++
 ryu/services/protocols/bgp/bmp.py          |  204 ++++++++++++++++++++++++++++
 ryu/services/protocols/bgp/core.py         |   24 ++++
 ryu/services/protocols/bgp/peer.py         |    2 +
 ryu/services/protocols/bgp/signals/emit.py |    8 ++
 6 files changed, 268 insertions(+)
 create mode 100644 ryu/services/protocols/bgp/bmp.py

diff --git a/ryu/services/protocols/bgp/api/rtconf.py 
b/ryu/services/protocols/bgp/api/rtconf.py
index ccc4dea..c11918c 100644
--- a/ryu/services/protocols/bgp/api/rtconf.py
+++ b/ryu/services/protocols/bgp/api/rtconf.py
@@ -223,3 +223,19 @@ def del_network(prefix):
     tm = CORE_MANAGER.get_core_service().table_manager
     tm.add_to_global_table(prefix, is_withdraw=True)
     return True
+
+# =============================================================================
+# BMP configuration related APIs
+# =============================================================================
+
+
+@register(name='bmp.start')
+def bmp_start(host, port):
+    core = CORE_MANAGER.get_core_service()
+    return core.start_bmp(host, port)
+
+
+@register(name='bmp.stop')
+def bmp_stop(host, port):
+    core = CORE_MANAGER.get_core_service()
+    return core.stop_bmp(host, port)
diff --git a/ryu/services/protocols/bgp/bgpspeaker.py 
b/ryu/services/protocols/bgp/bgpspeaker.py
index 8b6e1a0..e310435 100644
--- a/ryu/services/protocols/bgp/bgpspeaker.py
+++ b/ryu/services/protocols/bgp/bgpspeaker.py
@@ -456,3 +456,17 @@ class BGPSpeaker(object):
         param[neighbors.IP_ADDRESS] = address
         in_filter = call(func_name, **param)
         return in_filter
+
+    def bmp_start(self, host, port):
+        func_name = 'bmp.start'
+        param = {}
+        param['host'] = host
+        param['port'] = port
+        call(func_name, **param)
+
+    def bmp_stop(self, host, port):
+        func_name = 'bmp.stop'
+        param = {}
+        param['host'] = host
+        param['port'] = port
+        call(func_name, **param)
diff --git a/ryu/services/protocols/bgp/bmp.py 
b/ryu/services/protocols/bgp/bmp.py
new file mode 100644
index 0000000..92849ee
--- /dev/null
+++ b/ryu/services/protocols/bgp/bmp.py
@@ -0,0 +1,204 @@
+# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ryu.services.protocols.bgp.base import Activity
+from ryu.lib import hub
+from ryu.lib.packet import bmp
+from ryu.lib.packet import bgp
+from ryu.services.protocols.bgp import constants as const
+import socket
+import logging
+from calendar import timegm
+from ryu.services.protocols.bgp.signals.emit import BgpSignalBus
+
+LOG = logging.getLogger('bgpspeaker.bmp')
+
+
+class BMPClient(Activity):
+    """A BMP client.
+
+    Try to establish BMP session between a configured BMP server.
+    If BMP session is established, transfer information about peers
+    (e.g. received and sent open msgs, contents of adj-rib-in, other stats)
+
+    """
+
+    def __init__(self, core_service, host, port):
+        super(BMPClient, self).__init__(name='BMPClient(%s:%s)' % (host, port))
+        self._core_service = core_service
+        self._core_service.signal_bus.register_listener(
+            BgpSignalBus.BGP_ADJ_RIB_IN_CHANGED,
+            lambda _, data: self.on_adj_rib_in_changed(data)
+        )
+        self._core_service.signal_bus.register_listener(
+            BgpSignalBus.BGP_ADJ_UP,
+            lambda _, data: self.on_adj_up(data)
+        )
+        self._core_service.signal_bus.register_listener(
+            BgpSignalBus.BGP_ADJ_DOWN,
+            lambda _, data: self.on_adj_down(data)
+        )
+        self._socket = None
+        self.server_address = (host, port)
+        self._connect_retry_event = hub.Event()
+        self._connect_retry_time = 5
+
+    def _run(self):
+        self._connect_retry_event.set()
+
+        while True:
+            self._connect_retry_event.wait()
+
+            try:
+                self._connect_retry_event.clear()
+                self._connect_tcp(self.server_address,
+                                  self._handle_bmp_session)
+            except socket.error:
+                    self._connect_retry_event.set()
+                    LOG.info('Will try to reconnect to %s after %s secs: %s' %
+                             (self.server_address, self._connect_retry_time,
+                              self._connect_retry_event.is_set()))
+
+            self.pause(self._connect_retry_time)
+
+    def _send(self, msg):
+        if not self._socket:
+            return
+        assert isinstance(msg, bmp.BMPMessage)
+        serialized_msg = msg.serialize()
+
+        ret = self._socket.send(msg.serialize())
+
+    def on_adj_rib_in_changed(self, data):
+        peer = data['peer']
+        path = data['received_route']
+        update_msg = peer._construct_update(path)
+        msg = self._construct_route_monitoring(peer, path)
+        self._send(msg)
+
+    def on_adj_up(self, data):
+        peer = data['peer']
+        msg = self._construct_peer_up_notification(peer)
+        self._send(msg)
+
+        for path in peer._adj_rib_in.itervalues():
+            update_msg = peer._construct_update(path)
+            msg = self._construct_route_monitoring(peer, path)
+            self._send(msg)
+
+    def on_adj_down(self, data):
+        peer = data['peer']
+        msg = self._construct_peer_down_notification(peer)
+        self._send(msg)
+
+    def _construct_peer_up_notification(self, peer):
+        if peer.is_mpbgp_cap_valid(bgp.RF_IPv4_VPN) or \
+                peer.is_mpbgp_cap_valid(bgp.RF_IPv6_VPN):
+            peer_type = bmp.BMP_PEER_TYPE_L3VPN
+        else:
+            peer_type = bmp.BMP_PEER_TYPE_GLOBAL
+
+        peer_distinguisher = 0
+        peer_as = peer._neigh_conf.remote_as
+        peer_bgp_id = self._core_service.router_id
+        timestamp = peer.state._established_time
+
+        local_address = peer.host_bind_ip
+        local_port = int(peer.host_bind_port)
+        peer_address, remote_port = peer.protocol._remotename
+        remote_port = int(remote_port)
+
+        sent_open_msg = peer.protocol.sent_open_msg
+        recv_open_msg = peer.protocol.recv_open_msg
+
+        msg = bmp.BMPPeerUpNotification(local_address=local_address,
+                                        local_port=local_port,
+                                        remote_port=remote_port,
+                                        sent_open_message=sent_open_msg,
+                                        received_open_message=recv_open_msg,
+                                        peer_type=peer_type,
+                                        is_post_policy=False,
+                                        peer_distinguisher=peer_distinguisher,
+                                        peer_address=peer_address,
+                                        peer_as=peer_as,
+                                        peer_bgp_id=peer_bgp_id,
+                                        timestamp=timestamp)
+
+        return msg
+
+    def _construct_peer_down_notification(self, peer):
+        return bmp.BMPPeerDownNotification(bmp.BMP_PEER_DOWN_REASON_UNKNOWN,
+                                           data=None)
+
+    def _construct_route_monitoring(self, peer, path):
+        if peer.is_mpbgp_cap_valid(bgp.RF_IPv4_VPN) or \
+                peer.is_mpbgp_cap_valid(bgp.RF_IPv6_VPN):
+            peer_type = bmp.BMP_PEER_TYPE_L3VPN
+        else:
+            peer_type = bmp.BMP_PEER_TYPE_GLOBAL
+
+        peer_distinguisher = 0
+        peer_as = peer._neigh_conf.remote_as
+        peer_bgp_id = self._core_service.router_id
+        peer_address, _ = peer.protocol._remotename
+
+        bgp_update = peer._construct_update(path)
+        is_post_policy = not path.filtered
+        timestamp = timegm(path.timestamp)
+
+        msg = bmp.BMPRouteMonitoring(bgp_update=bgp_update,
+                                     peer_type=peer_type,
+                                     is_post_policy=is_post_policy,
+                                     peer_distinguisher=peer_distinguisher,
+                                     peer_address=peer_address,
+                                     peer_as=peer_as, peer_bgp_id=peer_bgp_id,
+                                     timestamp=timestamp)
+
+        return msg
+
+    def _handle_bmp_session(self, socket):
+
+        self._socket = socket
+        # send init message
+        init_info = {'type': bmp.BMP_INIT_TYPE_STRING,
+                     'value': u'This is Ryu BGP BMP message'}
+        init_msg = bmp.BMPInitiation([init_info])
+        self._send(init_msg)
+
+        # send peer-up message for each peers
+        peer_manager = self._core_service.peer_manager
+
+        for peer in (p for p in peer_manager.iterpeers if p.in_established()):
+            msg = self._construct_peer_up_notification(peer)
+            self._send(msg)
+
+            for path in peer._adj_rib_in.itervalues():
+                update_msg = peer._construct_update(path)
+                msg = self._construct_route_monitoring(peer, path)
+                self._send(msg)
+
+        # TODO periodically send stats to bmpstation
+
+        while True:
+            # bmpstation shouldn't send any packet to bmpclient.
+            # this recv() is only meant to detect socket closed
+            ret = self._socket.recv(1)
+            if len(ret) == 0:
+                LOG.debug('BMP socket is closed. retry connecting..')
+                self._socket = None
+                self._connect_retry_event.set()
+                break
+
+            # silently ignore packets from the bmpstation
diff --git a/ryu/services/protocols/bgp/core.py 
b/ryu/services/protocols/bgp/core.py
index f16f78e..8552a80 100644
--- a/ryu/services/protocols/bgp/core.py
+++ b/ryu/services/protocols/bgp/core.py
@@ -41,6 +41,7 @@ from ryu.services.protocols.bgp.signals.emit import 
BgpSignalBus
 from ryu.services.protocols.bgp.speaker import BgpProtocol
 from ryu.services.protocols.bgp.utils.rtfilter import RouteTargetManager
 from ryu.services.protocols.bgp.utils import stats
+from ryu.services.protocols.bgp.bmp import BMPClient
 from ryu.lib import sockopt
 
 
@@ -125,6 +126,9 @@ class CoreService(Factory, Activity):
         # BgpProcessor instance (initialized during start)
         self._bgp_processor = None
 
+        # BMP clients key: (host, port) value: BMPClient instance
+        self.bmpclients = {}
+
     def _init_signal_listeners(self):
         self._signal_bus.register_listener(
             BgpSignalBus.BGP_DEST_CHANGED,
@@ -451,3 +455,23 @@ class CoreService(Factory, Activity):
             peer._host_bind_ip = bind_ip
             peer._host_bind_port = bind_port
             self._spawn_activity(bgp_proto, peer)
+
+    def start_bmp(self, host, port):
+        if (host, port) in self.bmpclients:
+            bmpclient = self.bmpclients[(host, port)]
+            if bmpclient.started:
+                LOG.warn("bmpclient is already running for %s:%s" % (host,
+                                                                     port))
+                return False
+        bmpclient = BMPClient(self, host, port)
+        self.bmpclients[(host, port)] = bmpclient
+        self._spawn_activity(bmpclient)
+        return True
+
+    def stop_bmp(self, host, port):
+        if (host, port) not in self.bmpclients:
+            LOG.warn("no bmpclient is running for %s:%s" % (host, port))
+            return False
+
+        bmpclient = self.bmpclients[(host, port)]
+        bmpclient.stop()
diff --git a/ryu/services/protocols/bgp/peer.py 
b/ryu/services/protocols/bgp/peer.py
index b2b4217..a0a2314 100644
--- a/ryu/services/protocols/bgp/peer.py
+++ b/ryu/services/protocols/bgp/peer.py
@@ -195,12 +195,14 @@ class PeerState(object):
         if new_state == const.BGP_FSM_ESTABLISHED:
             self.incr(PeerCounterNames.FSM_ESTB_TRANSITIONS)
             self._established_time = time.time()
+            self._signal_bus.adj_up(self.peer)
             NET_CONTROLLER.send_rpc_notification(
                 'neighbor.up', {'ip_address': self.peer.ip_address}
             )
         # transition from Established to another state
         elif old_state == const.BGP_FSM_ESTABLISHED:
             self._established_time = 0
+            self._signal_bus.adj_down(self.peer)
             NET_CONTROLLER.send_rpc_notification(
                 'neighbor.down', {'ip_address': self.peer.ip_address}
             )
diff --git a/ryu/services/protocols/bgp/signals/emit.py 
b/ryu/services/protocols/bgp/signals/emit.py
index f015fdb..d97ce1c 100644
--- a/ryu/services/protocols/bgp/signals/emit.py
+++ b/ryu/services/protocols/bgp/signals/emit.py
@@ -14,6 +14,8 @@ class BgpSignalBus(SignalBus):
     BGP_BEST_PATH_CHANGED = ('core', 'best', 'changed')
     BGP_ADJ_RIB_IN_CHANGED = ('core', 'adj', 'rib', 'in', 'changed')
     BGP_ADJ_RIB_OUT_CHANGED = ('core', 'adj', 'rib', 'out', 'changed')
+    BGP_ADJ_UP = ('core', 'adj', 'up')
+    BGP_ADJ_DOWN = ('core', 'adj', 'down')
 
     def bgp_error(self, peer, code, subcode, reason):
         return self.emit_signal(
@@ -71,3 +73,9 @@ class BgpSignalBus(SignalBus):
         return self.emit_signal(
             self.BGP_ADJ_RIB_OUT_CHANGED,
             {'peer': peer, 'sent_route': sent_route})
+
+    def adj_up(self, peer):
+        return self.emit_signal(self.BGP_ADJ_UP, {'peer': peer})
+
+    def adj_down(self, peer):
+        return self.emit_signal(self.BGP_ADJ_DOWN, {'peer': peer})
-- 
1.7.10.4


------------------------------------------------------------------------------
Infragistics Professional
Build stunning WinForms apps today!
Reboot your WinForms applications with our WinForms controls. 
Build a bridge from your legacy apps to the future.
http://pubads.g.doubleclick.net/gampad/clk?id=153845071&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to