Hi,
Can you please look at the attached file. Any comments would be greatly appreciated. How can I run the unit tests on this code?
Thanks
Gary

On 04/24/2012 09:02 AM, Isaku Yamahata wrote:
Cc to ryu-devel.

On Tue, Apr 24, 2012 at 05:47:43AM -0000, Gary Kotton wrote:
Hi,
I found a number of problems with the Quantum agents. These happen when
the Quantum server is down (for example the host running the server is
rebooted).
I would like to update the RYU agent code to deal with these problems.
Are there uint tests that I can run to check this?
Thanks
Thank you for testing! I'm afraid Ryu plugin has poor unit test for now.
It's the area that we have to address. (patches are very welcome, off course)

Does the attached patch help?
Although it might not apply cleanly because it's for the gre demo tree,
you get the idea.
If you're seeing different issue, can you please elaborate on it?

thanks,

commit a043d85c63d93d6812937c034640ebe892670c05
Author: Isaku Yamahata<[email protected]>
Date:   Tue Apr 10 11:56:07 2012 +0900

     plugins/ryu/agent: fix exception

     When starting up, the network is expected to be created by quantum server.
     However there is a race between quantum server startup and quantum agent
     startup. So have agent also make the network.

     Traceback (most recent call last):
       File "/opt/stack/quantum/quantum/plugins/ryu/agent/ryu_quantum_agent.py", line 
540, in<module>
         main()
       File 
"/opt/stack/quantum/quantum/plugins/ryu/agent/ryu_quantum_agent.py", line 534, 
in main
         plugin.daemon_loop()
       File 
"/opt/stack/quantum/quantum/plugins/ryu/agent/ryu_quantum_agent.py", line 490, 
in daemon_loop
         self.gre_ports.update()
       File 
"/opt/stack/quantum/quantum/plugins/ryu/agent/ryu_quantum_agent.py", line 367, 
in update
         self._add_port(node)
       File 
"/opt/stack/quantum/quantum/plugins/ryu/agent/ryu_quantum_agent.py", line 344, 
in _add_port
         self.int_br.datapath_id, ofport)
       File "/opt/stack/ryu/ryu/app/client.py", line 91, in create_port
         self._do_request('POST', self.path_port % (network_id, dpid, port))
       File "/opt/stack/ryu/ryu/app/client.py", line 55, in _do_request
         res.getheaders(), res.read())
     httplib.HTTPException: (<httplib.HTTPResponse instance at 0x25e7a28>, 'code 404 reason Resource not found', 
[('date', 'Tue, 10 Apr 2012 02:26:03 GMT'), ('content-length', '466'), ('content-type', 'application/json')], 
'{"displayError": "The server does not have data for the request.", "error": "You 
submitted the following request.\\n\\n    POST /v1.0/networks/__NW_ID_VPORT_GRE__/00004e7f48e36243_\\n\\nThis request is not 
valid. The following paths were evaluated and failed\\nfor the indicated reason.\\n\\n    - 
/networks/{network-id}/{dpid}_{port-id}\\n      Invalid format: 00004e7f48e36243_\\n\\nYou can get a list of all valid 
requests with the following request.\\n\\n    GET /v1.0/doc"}')

     Signed-off-by: Isaku Yamahata<[email protected]>

diff --git a/quantum/plugins/ryu/agent/ryu_quantum_agent.py 
b/quantum/plugins/ryu/agent/ryu_quantum_agent.py
index a46ff2b..78b2d9e 100755
--- a/quantum/plugins/ryu/agent/ryu_quantum_agent.py
+++ b/quantum/plugins/ryu/agent/ryu_quantum_agent.py
@@ -321,6 +321,7 @@ class GREPortSet(object):
      def setup(self):
          _ovs_node_update(self.db, self.int_br.datapath_id, self.tunnel_ip)

+        self.api.update_network(rest_nw_id.NW_ID_VPORT_GRE)
          for port in self.int_br.get_gre_ports():
              try:
                  node = self.db.ovs_node.filter(
@@ -385,6 +386,7 @@ class VifPortSet(object):
          self.api.update_port(network_id, port.switch.datapath_id, port.ofport)
          if port.vif_mac is not None:
              # external port doesn't have mac address
+            self.api.update_network(network_id)
              self.api.update_mac(network_id, port.switch.datapath_id,
                                  port.ofport, port.vif_mac)
          else:



#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012 Isaku Yamahata <yamahata at private email ne jp>
# Based on openvswitch agent.
#
# Copyright 2011 Nicira Networks, Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
# @author: Isaku Yamahata
import ConfigParser
import logging as LOG
import shlex
import signal
import sys
import time
from optparse import OptionParser
from sqlalchemy.ext.sqlsoup import SqlSoup
from subprocess import PIPE, Popen

from ryu.app import rest_nw_id
from ryu.app.client import OFPClient


OP_STATUS_UP = "UP"
OP_STATUS_DOWN = "DOWN"
# Database connctivity status
DB_OK = 1
DB_RECONNECT = 2


class VifPort:
    """
    A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
    attributes set).
    """
    def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
        self.port_name = port_name
        self.ofport = ofport
        self.vif_id = vif_id
        self.vif_mac = vif_mac
        self.switch = switch

    def __str__(self):
        return ("iface-id=%s, vif_mac=%s, port_name=%s, ofport=%s, "
                "bridge name = %s" % (self.vif_id,
                                      self.vif_mac,
                                      self.port_name,
                                      self.ofport,
                                      self.switch.br_name))


class OVSBridge:
    def __init__(self, br_name, root_helper):
        self.br_name = br_name
        self.root_helper = root_helper
        self.datapath_id = None

    def find_datapath_id(self):
        # ovs-vsctl get Bridge br-int datapath_id
        res = self.run_vsctl(["get", "Bridge", self.br_name, "datapath_id"])

        # remove preceding/trailing double quotes
        dp_id = res.strip().strip('"')
        self.datapath_id = dp_id

    def run_cmd(self, args):
        cmd = shlex.split(self.root_helper) + args
        pipe = Popen(cmd, stdout=PIPE)
        retval = pipe.communicate()[0]
        if pipe.returncode == -(signal.SIGALRM):
            LOG.debug("## timeout running command: " + " ".join(cmd))
        return retval

    def run_vsctl(self, args):
        full_args = ["ovs-vsctl", "--timeout=2"] + args
        return self.run_cmd(full_args)

    def set_controller(self, target):
        methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix")
        args = target.split(":")
        if not args[0] in methods:
            target = "tcp:" + target
        self.run_vsctl(["set-controller", self.br_name, target])

    def db_get_map(self, table, record, column):
        str_ = self.run_vsctl(["get", table, record, column]).rstrip("\n\r")
        return self.db_str_to_map(str_)

    def db_get_val(self, table, record, column):
        return self.run_vsctl(["get", table, record, column]).rstrip("\n\r")

    @staticmethod
    def db_str_to_map(full_str):
        list = full_str.strip("{}").split(", ")
        ret = {}
        for elem in list:
            if elem.find("=") == -1:
                continue
            arr = elem.split("=")
            ret[arr[0]] = arr[1].strip("\"")
        return ret

    def get_port_name_list(self):
        res = self.run_vsctl(["list-ports", self.br_name])
        return res.split("\n")[:-1]

    def get_xapi_iface_id(self, xs_vif_uuid):
        return self.run_cmd(
                        ["xe",
                        "vif-param-get",
                        "param-name=other-config",
                        "param-key=nicira-iface-id",
                        "uuid=%s" % xs_vif_uuid]).strip()

    def _vifport(self, name, external_ids):
        ofport = self.db_get_val("Interface", name, "ofport")
        return VifPort(name, ofport, external_ids["iface-id"],
                       external_ids["attached-mac"], self)

    def _get_ports(self, get_port):
        ports = []
        port_names = self.get_port_name_list()
        for name in port_names:
            port = get_port(name)
            if port:
                ports.append(port)

        return ports

    def _get_vif_port(self, name):
        external_ids = self.db_get_map("Interface", name, "external_ids")
        if "iface-id" in external_ids and "attached-mac" in external_ids:
            return self._vifport(name, external_ids)
        elif ("xs-vif-uuid" in external_ids and
              "attached-mac" in external_ids):
            # if this is a xenserver and iface-id is not automatically
            # synced to OVS from XAPI, we grab it from XAPI directly
            ofport = self.db_get_val("Interface", name, "ofport")
            iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
            return VifPort(name, ofport, iface_id,
                           external_ids["attached-mac"], self)

    def get_vif_ports(self):
        "returns a VIF object for each VIF port"
        return self._get_ports(self._get_vif_port)

    def _get_external_port(self, name):
        external_ids = self.db_get_map("Interface", name, "external_ids")
        if external_ids:
            return

        ofport = self.db_get_val("Interface", name, "ofport")
        return VifPort(name, ofport, None, None, self)

    def get_external_ports(self):
        return self._get_ports(self._get_external_port)


def check_ofp_mode(db):
    LOG.debug("checking db")

    ofp_controller_addr = None
    ofp_rest_api_addr = None

    try:
        servers = db.ofp_server.all()
    except:
        self.db_state = DB_RECONNECT
        return (ofp_controller_addr, ofp_rest_api_addr)

    ofp_controller_addr = None
    ofp_rest_api_addr = None
    for serv in servers:
        if serv.host_type == "REST_API":
            ofp_rest_api_addr = serv.address
        elif serv.host_type == "controller":
            ofp_controller_addr = serv.address
        else:
            LOG.warn("ignoring unknown server type %s", serv)

    LOG.debug("controller %s", ofp_controller_addr)
    LOG.debug("api %s", ofp_rest_api_addr)
    if not ofp_controller_addr:
        raise RuntimeError("OF controller isn't specified")
    if not ofp_rest_api_addr:
        raise RuntimeError("Ryu rest API port isn't specified")

    LOG.debug("going to ofp controller mode %s %s",
              ofp_controller_addr, ofp_rest_api_addr)
    return (ofp_controller_addr, ofp_rest_api_addr)


class OVSQuantumOFPRyuAgent:
    def __init__(self, integ_br, root_helper):
        self.root_helper = root_helper
        self.
        (ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)

        self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
        self.api = OFPClient(ofp_rest_api_addr)
        self._setup_integration_br(integ_br, ofp_controller_addr)

    def _setup_integration_br(self, integ_br, ofp_controller_addr):
        self.int_br = OVSBridge(integ_br, self.root_helper)
        self.int_br.find_datapath_id()
        self.int_br.set_controller(ofp_controller_addr)
        for port in self.int_br.get_external_ports():
            self._port_update(self.nw_id_external, port)

    def _port_update(self, network_id, port):
        self.api.update_port(network_id, port.switch.datapath_id, port.ofport)

    def _all_bindings(self, db):
        """return interface id -> port which include network id bindings"""
        try:
            return dict((port.interface_id, port) for port in db.ports.all())
        except:
            self.db_state = DB_RECONNECT
            return {}

    def daemon_loop(self, options):
        count = 0
        threshold = 1

        db = SqlSoup(options["sql_connection"])
        LOG.info("Connecting to database \"%s\" on %s",
                 db.engine.url.database, db.engine.url.host)

        (ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)

        self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
        self.api = OFPClient(ofp_rest_api_addr)
        self._setup_integration_br(integ_br, ofp_controller_addr)

        # on startup, register all existing ports
        all_bindings = self._all_bindings(db)

        local_bindings = {}
        vif_ports = {}
        for port in self.int_br.get_vif_ports():
            vif_ports[port.vif_id] = port
            if port.vif_id in all_bindings:
                net_id = all_bindings[port.vif_id].network_id
                local_bindings[port.vif_id] = net_id
                self._port_update(net_id, port)
                all_bindings[port.vif_id].op_status = OP_STATUS_UP
                LOG.info("Updating binding to net-id = %s for %s",
                         net_id, str(port))
        try:
            db.commit()
        except:
            db.rollback()

        old_vif_ports = vif_ports
        old_local_bindings = local_bindings

        while True:
            if self.db_state == DB_RECONNECT:
                time.sleep(2)
                count = count + 1
                if count >= threshold:
                    LOG.info("Problem connecting to database \"%s\" on %s"
                             (db.engine.url.database, db.engine.url.host))
                    count = 0
                    threshold = threshold * 10
                db = SqlSoup(options["sql_connection"])
                self.db_state = DB_OK
                LOG.info("Re-connecting to database \"%s\" on %s" %
                         (db.engine.url.database, db.engine.url.host))
                (ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)
                if self.db_state == DB_RECONNECT:
                    continue
                self.api = OFPClient(ofp_rest_api_addr)

            all_bindings = self._all_bindings(db)
            if self.db_state == DB_RECONNECT:
                continue

            new_vif_ports = {}
            new_local_bindings = {}
            for port in self.int_br.get_vif_ports():
                new_vif_ports[port.vif_id] = port
                if port.vif_id in all_bindings:
                    net_id = all_bindings[port.vif_id].network_id
                    new_local_bindings[port.vif_id] = net_id

                old_b = old_local_bindings.get(port.vif_id)
                new_b = new_local_bindings.get(port.vif_id)
                if old_b == new_b:
                    continue

                if not old_b:
                    LOG.info("Removing binding to net-id = %s for %s",
                             old_b, str(port))
                    if port.vif_id in all_bindings:
                        all_bindings[port.vif_id].op_status = OP_STATUS_DOWN
                if not new_b:
                    if port.vif_id in all_bindings:
                        all_bindings[port.vif_id].op_status = OP_STATUS_UP
                    LOG.info("Adding binding to net-id = %s for %s",
                             new_b, str(port))

            for vif_id in old_vif_ports:
                if vif_id not in new_vif_ports:
                    LOG.info("Port Disappeared: %s", vif_id)
                    if vif_id in all_bindings:
                        all_bindings[vif_id].op_status = OP_STATUS_DOWN

            old_vif_ports = new_vif_ports
            old_local_bindings = new_local_bindings
            try:
                db.commit()
            except:
                db.rollback()

            time.sleep(2)


def main():
    usagestr = "%prog [OPTIONS] <config file>"
    parser = OptionParser(usage=usagestr)
    parser.add_option("-v", "--verbose", dest="verbose",
      action="store_true", default=False, help="turn on verbose logging")

    options, args = parser.parse_args()

    if options.verbose:
        LOG.basicConfig(level=LOG.DEBUG)
    else:
        LOG.basicConfig(level=LOG.WARN)

    if len(args) != 1:
        parser.print_help()
        sys.exit(1)

    config_file = args[0]
    config = ConfigParser.ConfigParser()
    try:
        config.read(config_file)
    except Exception, e:
        LOG.error("Unable to parse config file \"%s\": %s",
                  config_file, str(e))

    integ_br = config.get("OVS", "integration-bridge")

    root_helper = config.get("AGENT", "root_helper")

    options = {"sql_connection": config.get("DATABASE", "sql_connection")}
    db = SqlSoup(options["sql_connection"])

    LOG.info("Connecting to database \"%s\" on %s",
             db.engine.url.database, db.engine.url.host)
    plugin = OVSQuantumOFPRyuAgent(integ_br, root_helper)
    plugin.daemon_loop(options)

    sys.exit(0)


if __name__ == "__main__":
    main()
------------------------------------------------------------------------------
Live Security Virtual Conference
Exclusive live event will cover all the ways today's security and 
threat landscape has changed and how IT managers can respond. Discussions 
will include endpoint security, mobile security and the latest in malware 
threats. http://www.accelacomm.com/jaw/sfrnl04242012/114/50122263/
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to