Author: bpintea
Date: 2008-08-25 17:59:24 +0200 (Mon, 25 Aug 2008)
New Revision: 1076

Added:
   trunk/core/plug-in/binrpcctrl/ConnPool.cpp
   trunk/core/plug-in/binrpcctrl/ConnPool.h
   trunk/core/plug-in/binrpcctrl/CtrlServer.cpp
   trunk/core/plug-in/binrpcctrl/CtrlServer.h
Modified:
   trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp
   trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h
Log:
- Content-Type header is now always set (see
  http://lists.iptel.org/pipermail/semsdev/2008-March/002379.html)
- implemented a connection pool, used for SEMS->SER message xchange initiated
  by SEMS; this pairs with SEMS' one-thread-per-session design and should
  boost throughput, as it only used one main socket before
- added multi-threaded receiver, for SER->SEMS xchange, pairing with SER's
  multiprocess design and moving from the single-listener 1st implementation.


Modified: trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp
===================================================================
--- trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp 2008-08-24 17:07:26 UTC 
(rev 1075)
+++ trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.cpp 2008-08-25 15:59:24 UTC 
(rev 1076)
@@ -17,24 +17,22 @@
 
 #define LISTEN_ADDR_PARAM   "sems_address"
 #define SER_ADDR_PARAM      "ser_address"
+#define CT_TIMEOUT_PARAM    "connect_timeout"
+#define RX_TIMEOUT_PARAM    "receive_timeout"
+#define TX_TIMEOUT_PARAM    "transmit_timeout"
+#define RX_WORKERS_PARAM    "receive_workers"
+#define TX_WORKERS_PARAM    "transmit_workers"
 
 #define LISTEN_ADDR_DEFAULT "brpcnd://127.0.0.1:3334"
 #define SER_ADDR_DEFAULT    "brpcnd://127.0.0.1:1089"
 
 #define BRPC_CB_HASH_SIZE   16
-#define ASI_VERSION         0x2
-#define SND_USOCK_TEMPLATE  "/tmp/sems_send_sock_XXXXXX" //TODO: configurable
-#define MAX_RETRY_ON_ERR    5
-//TODO: configurable
-#define CT_TIMEOUT          500000
-#define RX_TIMEOUT          500000 /* 50000 */
-#define TX_TIMEOUT          200000
+#define CT_TIMEOUT          500 // ms
+#define RX_TIMEOUT          500 // ms
+#define TX_TIMEOUT          200 // ms
+#define RX_WORKERS          8
+#define TX_WORKERS          8
 
-#ifndef UNIX_PATH_MAX
-#include <sys/un.h>
-#define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
-#endif
-
 #define STX   0x02
 #define ETX   0x03
 #define SUB   0x21
@@ -82,13 +80,13 @@
 const BRPC_STR_STATIC_INIT(SER_DFMT_TO_TAG, "@to.tag");
 const BRPC_STR_STATIC_INIT(SER_DFMT_CSEQ_NUM, "@cseq.num");
 const BRPC_STR_STATIC_INIT(SER_DFMT_RR_ALL, "@hf_value.record_route");
-const BRPC_STR_STATIC_INIT(SER_DFMT_RR_1ST, "@hf_value.record_route[1]");
 const BRPC_STR_STATIC_INIT(SER_DFMT_BODY, "@msg.body");
 const BRPC_STR_STATIC_INIT(SER_DFMT_CMD, "$sems_cmd");
 const BRPC_STR_STATIC_INIT(SER_DFMT_HDRS, "$sems_hdrs");
 //aditionals, for replies
 const BRPC_STR_STATIC_INIT(SER_DFMT_CODE, "@code");
 const BRPC_STR_STATIC_INIT(SER_DFMT_REASON, "@reason");
+const BRPC_STR_STATIC_INIT(SER_DFMT_CONTTYPE, "@hf_value.content_type");
 
 
 static const brpc_str_t *SIP_CORE_METHODS[] = {
@@ -117,7 +115,7 @@
   &SER_DFMT_TO_TAG,
   &SER_DFMT_CSEQ_NUM,
   &SER_DFMT_RR_ALL,
-  &SER_DFMT_RR_1ST,
+  &SER_DFMT_CONTTYPE,
   &SER_DFMT_BODY,
   &SER_DFMT_CMD,
   &SER_DFMT_HDRS
@@ -127,11 +125,11 @@
   &SER_DFMT_CODE,
   &SER_DFMT_REASON,
   &SER_DFMT_CONTACT_URI,
-  &SER_DFMT_RR_1ST,
   &SER_DFMT_RR_ALL,
   &SER_DFMT_FROM_TAG,
   &SER_DFMT_TO_TAG,
   &SER_DFMT_CSEQ_NUM,
+  &SER_DFMT_CONTTYPE,
   &SER_DFMT_HDRS,
   &SER_DFMT_BODY
 };
@@ -151,10 +149,11 @@
 
 /* WARN: must remain sync'ed with SER's enum ASI_REQ_FLAGS! */
 enum SIP_REQ_FLAGS {
-  SIP_REQ_ACK_FLG = 1 << 0,
-  SIP_REQ_FIN_FLG = 1 << 1,
-  SIP_REQ_PRV_FLG = 1 << 2,
-  SIP_REQ_RUN_ORR = 1 << 3,
+  SIPREQ_GET_ACK_FLG = 1 << 0,
+  SIPREQ_GET_FIN_FLG = 1 << 1,
+  SIPREQ_GET_PRV_FLG = 1 << 2,
+  SIPREQ_RUN_ORR_FLG = 1 << 3,
+  SIPREQ_DEL_1ST_FLG = 1 << 4,
 };
 
 
@@ -183,87 +182,63 @@
 
 #define CONFIRM_RECEPTION 0
 
+
+static brpc_tv_t ct_timeout = CT_TIMEOUT * 1000;
+static brpc_tv_t rx_timeout = RX_TIMEOUT * 1000;
+static brpc_tv_t tx_timeout = TX_TIMEOUT * 1000;
+static unsigned rx_workers;
+static unsigned tx_workers;
+
 // time_t BrpcCtrlInterface::serial = -1;
 // brpc_int_t BrpcCtrlInterface::as_id = -1;
 
 BrpcCtrlInterfaceFactory::BrpcCtrlInterfaceFactory(const string &name) 
     : AmCtrlInterfaceFactory(name)
-{
-}
+{}
 
 BrpcCtrlInterfaceFactory::~BrpcCtrlInterfaceFactory()
-{
-}
+{}
 
 AmCtrlInterface* BrpcCtrlInterfaceFactory::instance()
 {
     BrpcCtrlInterface* ctrl = new BrpcCtrlInterface();
-    
+
     if(ctrl->init(semsUri,serUri) < 0){
-       delete ctrl;
-       return NULL;
+      delete ctrl;
+      return NULL;
     }
 
     return ctrl;
 }
 
 
-BrpcCtrlInterface::BrpcCtrlInterface()
-    : semsFd(-1),
-      serFd(-1),
-      serial(-1),
-      as_id(-1)
-{
-    memset(&sndAddr, 0, sizeof(sndAddr));
-}
+BrpcCtrlInterface::BrpcCtrlInterface() :
+    serial(-1),
+    as_id(-1)
+{}
+
 BrpcCtrlInterface::~BrpcCtrlInterface()
 {
-    closeSock(&serFd, &sndAddr);
-    closeSock(&semsFd, &semsAddr);
+  delete serConn;
+  delete ctrlSrv;
 }
 
 
-static int init_listener(const string &semsUri, brpc_addr_t *semsAddr)
-{
-  brpc_addr_t *addr;
-  int sockfd;
-
-  if (! (addr = brpc_parse_uri(semsUri.c_str()))) {
-    ERROR("failed to parse BINRPC URI `%s': %s [%d].\n", semsUri.c_str(),
-      brpc_strerror(), brpc_errno);
-    return -1;
-  } else if (BRPC_ADDR_TYPE(addr) != SOCK_DGRAM) {
-    //b/c we'd have to do connection management otherwise (listen for
-    //connections, poll for activity on each descriptor etc); for now, not
-    //relly needed.
-    ERROR("only datagram listeners supported.\n");
-    return -1;
-  }
-
-  if ((sockfd = brpc_socket(addr, /*blocking*/false, /*bind*/true)) < 0) {
-    ERROR("failed to get listen socket for URI `%s': %s [%d].\n", 
-      semsUri.c_str(), brpc_strerror(), brpc_errno);
-    return -1;
-  }
-  *semsAddr = *addr;
-  return sockfd;
-}
-
 int BrpcCtrlInterface::init(const string& semsUri, const string& serUri)
 {
-  brpc_addr_t *addr;
-
-  if ((semsFd = init_listener(semsUri, &semsAddr)) < 0) {
-    ERROR("failed to initialize BINRPC listening socket.\n");
+  try {
+    serConn = new ConnPool(serUri, tx_workers, ct_timeout);
+  } catch (string errmsg) {
+    ERROR("failed to initialize SER connection pool: %s.\n", errmsg.c_str());
     return -1;
   }
 
-  if (! (addr = brpc_parse_uri(serUri.c_str()))) {
-    ERROR("failed to parse BINRPC URI `%s': %s [%d].\n", serUri.c_str(),
-      brpc_strerror(), brpc_errno);
+  try {
+    ctrlSrv = new CtrlServer(semsUri, rx_workers, rx_timeout, tx_timeout);
+  } catch (string errmsg) {
+    ERROR("failed to initialize binRPC server: %s.\n", errmsg.c_str());
+    delete serConn;
     return -1;
-  } else {
-    serAddr = *addr;
   }
 
   sipDispatcher = AmSipDispatcher::instance();
@@ -274,6 +249,7 @@
 int BrpcCtrlInterfaceFactory::onLoad()
 {
   AmConfigReader cfg;
+  unsigned int ct_to, rx_to, tx_to;
 
   if (cfg.loadFile(AmConfig::ModConfigPath + string(MOD_NAME ".conf"))) {
     WARN("failed to read/parse config file `%s' - assuming defaults\n",
@@ -283,99 +259,53 @@
   } else {
     semsUri = cfg.getParameter(LISTEN_ADDR_PARAM, LISTEN_ADDR_DEFAULT);
     serUri = cfg.getParameter(SER_ADDR_PARAM, SER_ADDR_DEFAULT);
-  }
-  INFO(LISTEN_ADDR_PARAM ": %s.\n", semsUri.c_str());
-  INFO(SER_ADDR_PARAM ": %s.\n", serUri.c_str());
 
-  return 0;
-}
+    if (str2i(cfg.getParameter(CT_TIMEOUT_PARAM, int2str(CT_TIMEOUT)), 
+        ct_to)) {
+      ERROR("failed to read `%s' param from config file.\n", CT_TIMEOUT_PARAM);
+      return -1;
+    } else {
+      ct_timeout = ct_to * 1000;
+    }
+    if (str2i(cfg.getParameter(RX_TIMEOUT_PARAM, int2str(RX_TIMEOUT)), 
+        rx_to)) {
+      ERROR("failed to read `%s' param from config file.\n", RX_TIMEOUT_PARAM);
+      return -1;
+    } else {
+      rx_timeout = rx_to * 1000;
+    }
+    if (str2i(cfg.getParameter(TX_TIMEOUT_PARAM, int2str(TX_TIMEOUT)), 
+        tx_to)) {
+      ERROR("failed to read `%s' param from config file.\n", TX_TIMEOUT_PARAM);
+      return -1;
+    } else {
+      tx_timeout = tx_to * 1000;
+    }
 
 
-int BrpcCtrlInterface::getSerFd()
-{
-  if (0 <= serFd)
-    return serFd;
-
-  //if using local sockets, we need to bind to a local socket when sending
-  //out requests, so that SER knows where to send back replies for our
-  //requests (unlike INET layers PF_LOCAL+DGRAM based communication needs to
-  //bind.the sending socket in order to have this socket receive replies)
-  if (BRPC_ADDR_DOMAIN(&serAddr) == PF_LOCAL) {
-    //There's a race the loop tries to work around:
-    //1. mkstemp creates & opens a temp file. brpc_socket() (called
-    //later), removes the temporary file (unlink) and opens a unix 
-    //socket with the same name.
-    //So, between unlink() and socket(), some other mkstemp could
-    //"ocupy" the name.
-    int errcnt = -1;
-    do {
-      if (MAX_RETRY_ON_ERR < ++errcnt) {
-        ERROR("%dth consecutive failed attempt to create a local domain "
-          "socket for sending req - giving up.\n", errcnt);
-        return -1;
-      } else if (1 < errcnt) { //the previous brpc_socket() attempt failed
-        ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(),
-          brpc_errno);
-      }
-      char buff[UNIX_PATH_MAX];
-      assert(sizeof(SND_USOCK_TEMPLATE) <= sizeof(buff));
-      memcpy(buff, SND_USOCK_TEMPLATE, sizeof(SND_USOCK_TEMPLATE));
-      int tmpfd = mkstemp(buff);
-      if (tmpfd < 0) {
-        ERROR("failed to create temporary file with template `%s': %s [%d].\n",
-            SND_USOCK_TEMPLATE, strerror(errno), errno);
-        continue;
-      } else {
-        //close the FD - only the modified buff is of worth
-        close(tmpfd);
-      }
-      sndAddr = serAddr; //copy domain, socket type
-      memcpy(BRPC_ADDR_UN(&sndAddr)->sun_path, buff, strlen(buff) + 
-        /*0-term*/1);
-      BRPC_ADDR_LEN(&sndAddr) = SUN_LEN(BRPC_ADDR_UN(&sndAddr));
-      DBG("creating temporary send socket bound to `%s'.\n", buff);
-    } while ((serFd = brpc_socket(&sndAddr, /*blocking*/false, 
-        /*named*/true)) < 0);
-    /* TODO: permission, UID/GID of the created socket */
-  } else {
-    if ((serFd = brpc_socket(&serAddr, /*blk*/false, /*named*/false)) < 0) {
-      ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(),
-        brpc_errno);
+    if (str2i(cfg.getParameter(RX_WORKERS_PARAM, int2str(RX_WORKERS)), 
+        rx_workers)) {
+      ERROR("failed to read `%s' param from config file.\n", RX_WORKERS_PARAM);
       return -1;
     }
+    if (str2i(cfg.getParameter(TX_WORKERS_PARAM, int2str(TX_WORKERS)), 
+        tx_workers)) {
+      ERROR("failed to read `%s' param from config file.\n", TX_WORKERS_PARAM);
+      return -1;
+    }
   }
-  if (! brpc_connect(&serAddr, &serFd, CT_TIMEOUT)) {
-    ERROR("failed to connect to SER: %s [%d].\n", brpc_strerror(), brpc_errno);
-    close(serFd);
-    serFd = -1;
-  }
-  return serFd;
+  INFO(LISTEN_ADDR_PARAM ": %s.\n", semsUri.c_str());
+  INFO(SER_ADDR_PARAM ": %s.\n", serUri.c_str());
+  INFO(CT_TIMEOUT_PARAM ": %uus.\n", (unsigned)ct_timeout);
+  INFO(RX_TIMEOUT_PARAM ": %uus.\n", (unsigned)rx_timeout);
+  INFO(TX_TIMEOUT_PARAM ": %uus.\n", (unsigned)tx_timeout);
+  INFO(RX_WORKERS_PARAM ": %u.\n", rx_workers);
+  INFO(TX_WORKERS_PARAM ": %u.\n", tx_workers);
+
+  return 0;
 }
 
-void BrpcCtrlInterface::closeSock(int *sock, brpc_addr_t *addr)
-{
-  DBG("closing FD#%d for %s.\n", *sock, 
-    BRPC_ADDR_DOMAIN(addr) ? brpc_print_addr(addr) : "[INET/INET6 sender]");
 
-  if (*sock < 0) {
-    WARN("connection not oppened, so can not be closed.\n");
-    return;
-  }
-
-  if (close(*sock) < 0)
-    WARN("FD closed uncleanly: %s [%d].\n", strerror(errno), errno);
-  
-  if (BRPC_ADDR_DOMAIN(addr) == PF_LOCAL) {
-    if (unlink(BRPC_ADDR_UN(addr)->sun_path) < 0) {
-      ERROR("failed to remove unix socket file '%s': %s [%d].\n", 
-        BRPC_ADDR_UN(addr)->sun_path, strerror(errno), errno);
-    }
-  }
-  
-  DBG("socket %d closed.\n", *sock);
-  *sock = -1;
-}
-
 bool BrpcCtrlInterface::initCallbacks()
 {
   if (! brpc_cb_init(BRPC_CB_HASH_SIZE, /*no reply handling*/0)) {
@@ -577,7 +507,7 @@
     GOTOERR(CODE_RPC_INVALID);
 
   // check if not a bogus call
-  for (i = 0; i <  sizeof(SIP_CORE_METHODS)/sizeof(brpc_str_t *); i ++) {
+  for (i = 0; i < sizeof(SIP_CORE_METHODS)/sizeof(brpc_str_t *); i ++) {
     if ((mname->len == SIP_CORE_METHODS[i]->len) &&
         (strncmp(mname->val, SIP_CORE_METHODS[i]->val, mname->len) == 0)) {
       break;
@@ -643,19 +573,19 @@
   brpc_t *rpl = NULL;
   brpc_str_t *reason;
   brpc_int_t *code;
-  brpc_addr_t from = serAddr; //avoid a syscall to find out socket type
+  brpc_addr_t from = serConn->txAddr; //avoid a syscall to find socket type
   brpc_id_t req_id;
+  int serFd;
 
-  if (getSerFd() < 0) {
+  if ((serFd = serConn->get()) < 0) {
     ERROR("no connection to SER available.\n");
     goto end;
   }
-  assert(0 <= serFd);
 
-  if (! brpc_sendto(serFd, &serAddr, req, TX_TIMEOUT)) {
+  if (! brpc_sendto(serFd, &serConn->txAddr, req, tx_timeout)) {
     ERROR("failed to send msg to SER: %s [%d].\n", brpc_strerror(), 
       brpc_errno);
-    closeSock(&serFd, &sndAddr);
+    serConn->destroy(serFd);
     goto end;
   } else {
     req_id = req->id;
@@ -664,16 +594,17 @@
   }
   
   /* receive from queue until empty, if IDs do not match */
-  while ((rpl = brpc_recvfrom(serFd, &from, RX_TIMEOUT))) {
+  while ((rpl = brpc_recvfrom(serFd, &from, rx_timeout))) {
     if (req_id == rpl->id)
       break;
-    ERROR("received reply's ID (#%d) doesn't match request's - discarded (%d)",
-        brpc_id(rpl), req_id);
+    ERROR("received reply's ID (#%d) doesn't match request's - "
+               "discarded (%d).\n", brpc_id(rpl), req_id);
     brpc_finish(rpl);
   }
+  serConn->release(serFd);
   if (! rpl) {
-    ERROR("failed to get reply: %s [%d].\n", brpc_strerror(), brpc_errno);
-    closeSock(&serFd, &sndAddr);
+    ERROR("failed to get reply (waited max %uus): %s [%d].\n", 
+        (unsigned)rx_timeout, brpc_strerror(), brpc_errno);
     goto end;
   }
   if (brpc_is_fault(rpl)) {
@@ -733,7 +664,7 @@
   brpc_str_t listen, *reason;
   int *retcode;
 
-  listen.val = brpc_print_addr(&semsAddr);
+  listen.val = brpc_print_addr(&ctrlSrv->rxAddr);
   listen.len = strlen(listen.val);
 
   if (! ((req = brpc_req(METH_SER_RESYNC, random())) && 
@@ -781,29 +712,6 @@
     brpc_finish(rpl);
 }
 
-void BrpcCtrlInterface::_run()
-{
-  brpc_addr_t from;
-  brpc_t *req, *rpl;
-
-  DBG("Running BrpcCtrlInterface thread.\n");
-  while (! is_stopped()) {
-    from = semsAddr; // avoid a syscall to find out socket type
-    if (! (req = brpc_recvfrom(semsFd, &from, RX_TIMEOUT)))
-      continue;
-    //unsafe
-    DBG("received BINRPC request `%.*s'.\n", BRPC_STR_FMT(brpc_method(req)));
-    if ((rpl = brpc_cb_run(req))) {
-      if (! brpc_sendto(semsFd, &from, rpl, TX_TIMEOUT)) {
-        ERROR("failed to send reply to BINRPC request: %s [%d].\n",
-          brpc_strerror(), brpc_errno);
-      }
-      brpc_finish(rpl);
-    }
-    brpc_finish(req);
-  }
-}
-
 void BrpcCtrlInterface::run()
 {
   if (! sipDispatcher) {
@@ -819,9 +727,15 @@
   if(rpcCheck())
     serResync();
 
-  _run();
+  ctrlSrv->start();
+  ctrlSrv->join();
 }
 
+void BrpcCtrlInterface::on_stop()
+{
+  ctrlSrv->stop();
+}
+
 static inline enum RPC_ERR_CODE read_unsigned(string &u_str, 
     unsigned int &u_int)
 {
@@ -873,17 +787,15 @@
     &amReq.to_tag,
     &cseq_str,
     &amReq.route,
-    &amReq.next_hop,
+    &amReq.content_type,
     &amReq.body,
     &amReq.cmd,
     &amReq.hdrs
   };
   brpc_str_t *cstr_refs[sizeof(strRef)/sizeof(string *)];
 
-#ifndef NDEBUG
   assert(sizeof(strRef)/sizeof(string *) - /*implicit TID*/1 == 
     sizeof(REQ_FMTS)/sizeof(brpc_str_t *));
-#endif
 
   memset(fmt, 's', sizeof(fmt)/sizeof(char) - 1);
   fmt[0] = '!'; /* lay the refs in array */
@@ -926,11 +838,11 @@
     &code_str,
     &amRpl.reason,
     &amRpl.next_request_uri,
-    &amRpl.next_hop,
     &amRpl.route,
     &amRpl.local_tag,
     &amRpl.remote_tag,
     &cseq_str,
+    &amRpl.content_type,
     &amRpl.hdrs,
     &amRpl.body
   };
@@ -1002,18 +914,20 @@
   int mtype;
   AmSipRequest amReq;
   AmSipReply amRpl;
-  BrpcCtrlInterface *iface = (BrpcCtrlInterface *)_iface;
+  //BrpcCtrlInterface *iface = (BrpcCtrlInterface *)_iface;
 
   switch ((mtype = get_sipmeth_type(req))) {
     case SIP_METH_REQ:
       if ((errcode = sip_req_handler(req, amReq)) == CODE_RPC_SUCCESS)
-        iface->handleSipMsg(amReq);
+        //iface->handleSipMsg(amReq);
+        AmSipDispatcher::instance()->handleSipMsg(amReq);
       break;
 
     case SIP_METH_FIN:
     case SIP_METH_PRV:
       if ((errcode = sip_fin_handler(req, amRpl)) == CODE_RPC_SUCCESS)
-        iface->handleSipMsg(amRpl);
+        //iface->handleSipMsg(amRpl);
+        AmSipDispatcher::instance()->handleSipMsg(amRpl);
       break;
 
     case SIP_METH_NONE: 
@@ -1047,13 +961,69 @@
   return req;
 }
 
+/**
+ * Extract the bodies of multiple Route headers.
+ */
+static inline string rtset_body(const string &rthdr)
+{
+  string rtset;
+  const char *pos, *end;
+  const char *rt_start;
+  int eoh_len;
+
+  for (pos = rthdr.c_str(), end = pos + rthdr.length(); pos < end; ) {
+    // if starts with `Route:', skip it (can start with WS, if multiline body
+    if (((signed)SIP_HDR_LEN(SIP_HDR_COL(SIP_HDR_ROUTE)) < end - pos) ||
+       (memcmp(pos, SIP_HDR_COL(SIP_HDR_ROUTE), 
+        SIP_HDR_LEN(SIP_HDR_COL(SIP_HDR_ROUTE))) == 0))
+      pos += SIP_HDR_LEN(SIP_HDR_COL(SIP_HDR_ROUTE));
+
+    // skip leading WS
+    while (pos < end) {
+      switch (*pos) {
+        case ' ':
+        case '\n':
+          pos ++;
+          continue;
+      }
+      break;
+    }
+    // mark begining of route body
+    rt_start = pos;
+
+    /* find end of route body */
+    eoh_len = 0;
+    for ( ; pos < end; pos ++)
+      if (*pos == '\r') {
+        eoh_len ++;
+        break;
+      }
+    if (pos < end)
+      pos ++;
+    if (pos < end && *pos == '\n') {
+      eoh_len ++;
+      pos ++;
+    }
+
+    // roll back over the existing `,' of a multiline route set
+    if (eoh_len && pos[-eoh_len] == ',')
+      eoh_len ++;
+
+    if (rt_start < pos - eoh_len) {
+      if (! rtset.empty())
+        rtset += ", ";
+      rtset += string(rt_start, &pos[-eoh_len] - rt_start);
+    }
+  }
+
+  return rtset;
+}
+
 #define XTRA_HDRS(_xhdrs, _msg)                                                
\
   string _xhdrs;                                                       \
-  if (_msg.route.length())                                             \
-    _xhdrs += _msg.route;                                              \
-  if (_msg.contact.length())                                           \
+  if (! _msg.contact.empty())                                          \
     _xhdrs += _msg.contact;                                            \
-  if (_msg.content_type.length())                                      \
+  if (! _msg.content_type.empty())                                     \
     _xhdrs += SIP_HDR_COLSP(SIP_HDR_CONTENT_TYPE) + _msg.content_type + CRLF;\
   _xhdrs += _msg.hdrs;
 
@@ -1075,11 +1045,34 @@
   STR2BSTR(_from, amReq.from);
   STR2BSTR(_to, amReq.to);
   STR2BSTR(_callid, amReq.callid);
-  STR2BSTR(_next_hop, amReq.next_hop);
   STR2BSTR(_hdrs, xtraHdrs);
   STR2BSTR(_body, amReq.body);
   STR2BSTR(_empty, string(""));
 
+  string rtset;
+  try {
+    rtset = rtset_body(amReq.route);
+  } catch (string emsg) {
+    ERROR("failed to parse route set headers: %s.\n", rtset.c_str());
+    return NULL;
+  }
+
+  int rtflag;
+  if (! amReq.next_hop.empty()) {
+    string nhop;
+    if (amReq.next_hop.c_str()[0] != '<')
+      nhop = "<" + amReq.next_hop + ">";
+    else
+      nhop = amReq.next_hop;
+
+    rtset = nhop + rtset;
+    rtflag = SIPREQ_DEL_1ST_FLG;
+  } else {
+    rtflag = 0;
+  }
+
+  STR2BSTR(_rtset, rtset);
+
 #define STRIP_HF_NAME(_bstr_, _hf_name, _hf_name_len)  \
   do {  \
     if ((_hf_name_len < (_bstr_)->len) && \
@@ -1100,14 +1093,15 @@
 
   if (! brpc_asm(req, REQUEST_FMT_REQ,
       as_id,
-      SIP_REQ_FIN_FLG|SIP_REQ_PRV_FLG|SIP_REQ_RUN_ORR, // FIXME: parameterized
+      // FIXME: parameterized
+      SIPREQ_GET_FIN_FLG|SIPREQ_GET_PRV_FLG|SIPREQ_RUN_ORR_FLG|rtflag,
       &_method,
       &_r_uri,
       &_from, // FIXME: only HF value; MUST have tag (check)
       &_to, // FIXME: only HF value (no "To: " included) (check)
       amReq.cseq,
       &_callid,
-      &_next_hop,
+      &_rtset,
       &_hdrs,
       &_body,
       &_empty // FIXME: "use the power!"
@@ -1229,8 +1223,14 @@
     goto end;
   }
   if (300 <= *retcode) {
-    ERROR("RPC request failed with code: %d, status: '%.*s'.\n", *retcode,
-        /*misleading var. name!*/BRPC_STR_FMT(ser_opaque));
+#if 0
+    ERROR("RPC request failed (code: %d, status: '%.*s') for reply: %s\n", 
+        *retcode, /*misleading var. name!*/BRPC_STR_FMT(ser_opaque),
+        ((AmSipReply)amRpl).print().c_str());
+#else
+    ERROR("RPC request failed (code: %d, status: '%.*s') for reply.\n", 
+        *retcode, /*misleading var. name!*/BRPC_STR_FMT(ser_opaque));
+#endif
     goto end;
   }
 
@@ -1250,7 +1250,7 @@
 {
   string localUri;
 
-  if (displayName.length()) {
+  if (! displayName.empty()) {
     // quoting is safer (the check for quote need doesn't really pay off)
     if (displayName.c_str()[0] == '"') {
       assert(displayName.c_str()[displayName.length() - 1] == '"');
@@ -1265,17 +1265,17 @@
 
   // angular brackets not always needed (unless contact)
   localUri += "<";
-  if (hostName.length()) {
+  if (! hostName.empty()) {
     localUri += SIP_SCHEME_SIP; //TODO: sips|tel|tels
     localUri += ":";
-    if (userName.length()) {
+    if (! userName.empty()) {
       localUri += userName;
       localUri += "@";
     }
     localUri += hostName;
   } else {
     // SER will substituite the markers below
-    if (userName.length()) {
+    if (! userName.empty()) {
       localUri += char(STX);
       localUri += userName;
       localUri += char(ETX);
@@ -1284,14 +1284,14 @@
     }
   }
 
-  if (uriParams.length()) {
+  if (! uriParams.empty()) {
     if (uriParams.c_str()[0] != ';')
       localUri += ';';
     localUri += uriParams;
   }
   localUri += ">";
 
-  if (hdrParams.length()) {
+  if (! hdrParams.empty()) {
     if (hdrParams.c_str()[0] != ';')
       localUri += ';';
     localUri += hdrParams;

Modified: trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h
===================================================================
--- trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h   2008-08-24 17:07:26 UTC 
(rev 1075)
+++ trunk/core/plug-in/binrpcctrl/BrpcCtrlInterface.h   2008-08-25 15:59:24 UTC 
(rev 1076)
@@ -7,7 +7,13 @@
 
 #include "AmApi.h"
 #include "AmSipDispatcher.h"
+#include "ConnPool.h"
+#include "CtrlServer.h"
 
+
+#define ASI_VERSION         0x2
+
+
 class BrpcCtrlInterfaceFactory : public AmCtrlInterfaceFactory
 {
     string semsUri, serUri;
@@ -30,22 +36,17 @@
     // handler of requests (SIP request | reply) received from SER
     AmSipDispatcher *sipDispatcher;
 
-    //addresses for:
-    //- SEMS listening for SER requests
-    //- SER listening for SEMS requests
-    //- SEMS SeNDing requests to SER, when using PF_LOCAL sockets (might not
-    //be used) 
-    brpc_addr_t semsAddr, serAddr, sndAddr;
-    int semsFd, serFd;
+    ConnPool *serConn;
+    CtrlServer *ctrlSrv;
 
     inline void handleSipMsg(AmSipRequest &req)
     {
-       AmSipDispatcher::instance()->handleSipMsg(req);
+      AmSipDispatcher::instance()->handleSipMsg(req);
     }
 
     inline void handleSipMsg(AmSipReply &rpl)
     {
-       AmSipDispatcher::instance()->handleSipMsg(rpl);
+      AmSipDispatcher::instance()->handleSipMsg(rpl);
     }
 
     brpc_t *rpcExecute(brpc_t *req);
@@ -57,12 +58,7 @@
     static brpc_t *digests(brpc_t *req, void *iface);
     static brpc_t *req_handler(brpc_t *req, void *iface);
 
-    int getSerFd();
-    void closeSerConn();
-    static void closeSock(int *sock, brpc_addr_t *addr);
-
     bool initCallbacks();
-    void _run();
 
  public:
     BrpcCtrlInterface();
@@ -72,7 +68,7 @@
 
     // AmThread
     void run();
-    void on_stop() {}
+    void on_stop();
 
     // AmCtrlInterface
     int send(const AmSipRequest &, char *, unsigned int &);

Added: trunk/core/plug-in/binrpcctrl/ConnPool.cpp
===================================================================
--- trunk/core/plug-in/binrpcctrl/ConnPool.cpp  2008-08-24 17:07:26 UTC (rev 
1075)
+++ trunk/core/plug-in/binrpcctrl/ConnPool.cpp  2008-08-25 15:59:24 UTC (rev 
1076)
@@ -0,0 +1,232 @@
+
+#include <stdlib.h>
+#include <errno.h>
+#include <assert.h>
+#include <binrpc.h>
+
+#include "log.h"
+#include "ConnPool.h"
+
+
+#define SND_USOCK_TEMPLATE  "/tmp/sems_send_sock_XXXXXX" //TODO: configurable
+#define MAX_RETRY_ON_ERR    5
+
+#ifndef UNIX_PATH_MAX
+#include <sys/un.h>
+#define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
+#endif
+
+
+// The locks are of type 'DEFAULT', non recursive -> should not fail.
+#define LOCK \
+  if (pthread_mutex_lock(&mutex) != 0) { \
+    ERROR("CRITICAL: failed to lock mutex: %s [%d].\n", strerror(errno), \
+        errno); \
+    abort(); \
+  }
+#define UNLOCK \
+  if (pthread_mutex_unlock(&mutex) != 0) { \
+    ERROR("CRITICAL: failed to unlock mutex: %s [%d].\n", strerror(errno), \
+        errno); \
+    abort(); \
+  }
+#define WAIT \
+  if (pthread_cond_wait(&cond, &mutex) != 0) { \
+    ERROR("CRITICAL: failed to wait on condition: %s [%d].\n", \
+        strerror(errno), errno); \
+    abort(); \
+  }
+#define WAKEUP \
+  if (pthread_cond_signal(&cond) != 0) { \
+    ERROR("CRITICAL: failed to signal on cond"); \
+    abort(); \
+  }
+#define SAFE(instr_set) \
+  do { \
+    LOCK; \
+    instr_set; \
+    UNLOCK; \
+  } while (0)
+
+ConnPool::ConnPool(const string &target, unsigned size, brpc_tv_t ct_to) :
+  cap(size),
+  size(0),
+  ct_timeout(ct_to),
+  onwait(0)
+{
+  brpc_addr_t *addr;
+  if (! (addr = brpc_parse_uri(target.c_str())))
+    throw "failed to parse BINRPC URI `" + target + "': " + 
+        string(brpc_strerror()) + ".";
+  else
+    txAddr = *addr;
+
+  if (pthread_mutex_init(&mutex, 0) != 0)
+    throw "failed to init mutex";
+  if (pthread_cond_init(&cond, 0) != 0)
+    throw "failed to init wait condition";
+}
+
+ConnPool::~ConnPool()
+{
+  int fd;
+
+  cap = 0; // prevent making new connections;
+  while (size) {
+    if (0 <= (fd = get())) {
+      destroy(fd);
+    } else {
+      ERROR("failed to destroy all connections (%s [%d]).\n", 
+          strerror(errno), errno);
+      break;
+    }
+  }
+
+  pthread_mutex_destroy(&mutex);
+  pthread_cond_destroy(&cond);
+}
+
+int ConnPool::get()
+{
+  int fd;
+
+  LOCK;
+
+  while (fdStack.empty()) {
+    if (size < cap) {
+      size ++; // inc it now, so that the cap is enforced
+      UNLOCK;
+      return new_conn();
+    } else {
+      onwait ++;
+      INFO("%dth worker asking for connectio, put on wait. "
+          "Need more capacity? (current: %d)\n", onwait, cap);
+      WAIT;
+      onwait --;
+    }
+  }
+  fd = fdStack.top();
+  fdStack.pop();
+
+  UNLOCK;
+
+  DBG("connection FD#%d aquired.\n", fd);
+  return fd;
+}
+
+void ConnPool::release(int fd)
+{
+  assert(0 <= fd);
+
+  LOCK;
+  if (onwait && fdStack.empty())
+    WAKEUP;
+  fdStack.push(fd);
+  UNLOCK;
+  DBG("connection FD#%d released.\n", fd);
+}
+
+void ConnPool::destroy(int fd)
+{
+  brpc_addr_t addr;
+
+  assert(0 <= fd);
+
+  LOCK;
+  if (BRPC_ADDR_DOMAIN(&txAddr) == PF_LOCAL) {
+    addr = locAddrMap[fd];
+    locAddrMap.erase(fd);
+  } else {
+    BRPC_ADDR_DOMAIN(&addr) = 0;
+  }
+  size --;
+  UNLOCK;
+
+  if (BRPC_ADDR_DOMAIN(&addr)) {
+    INFO("closing FD#%d for %s.\n", fd, brpc_print_addr(&addr));
+    if (unlink(BRPC_ADDR_UN(&addr)->sun_path) < 0) {
+      ERROR("failed to remove unix socket file '%s': %s [%d].\n", 
+        BRPC_ADDR_UN(&addr)->sun_path, strerror(errno), errno);
+    }
+  } else {
+    INFO("closing FD#%d for %s.\n", fd, brpc_print_addr(&txAddr));
+  }
+
+  if (close(fd) < 0)
+    ERROR("FD %d closed uncleanly: %s [%d].\n", fd, strerror(errno), errno);
+
+  DBG("connection FD#%d destroyied.\n", fd);
+}
+
+int ConnPool::new_conn()
+{
+  int fd;
+  brpc_addr_t locAddr;
+
+  //if using local sockets, we need to bind to a local socket when sending
+  //out requests, so that SER knows where to send back replies for our
+  //requests (unlike INET layers PF_LOCAL+DGRAM based communication needs to
+  //bind.the sending socket in order to have this socket receive replies)
+  if (BRPC_ADDR_DOMAIN(&txAddr) == PF_LOCAL) {
+    //There's a race the loop tries to work around:
+    //1. mkstemp creates & opens a temp file. brpc_socket() (called
+    //later), removes the temporary file (unlink) and opens a unix 
+    //socket with the same name.
+    //So, between unlink() and socket(), some other mkstemp could
+    //"ocupy" the name.
+    int errcnt = -1;
+    do {
+      if (MAX_RETRY_ON_ERR < ++errcnt) {
+        ERROR("%dth consecutive failed attempt to create a local domain "
+          "socket for sending req - giving up.\n", errcnt);
+        fd = -1;
+        goto end;
+      } else if (1 < errcnt) { //the previous brpc_socket() attempt failed
+        ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(),
+          brpc_errno);
+      }
+
+      char buff[UNIX_PATH_MAX];
+      assert(sizeof(SND_USOCK_TEMPLATE) <= sizeof(buff));
+      memcpy(buff, SND_USOCK_TEMPLATE, sizeof(SND_USOCK_TEMPLATE));
+      int tmpFd = mkstemp(buff);
+      if (tmpFd < 0) {
+        ERROR("failed to create temporary file with template `%s': %s [%d].\n",
+            SND_USOCK_TEMPLATE, strerror(errno), errno);
+        continue;
+      } else {
+        close(tmpFd); // close the FD - only the modified buff is of worth
+      }
+      locAddr = txAddr; //copy domain, socket type
+      memcpy(BRPC_ADDR_UN(&locAddr)->sun_path, buff, strlen(buff)+/*0-term*/1);
+      BRPC_ADDR_LEN(&locAddr) = SUN_LEN(BRPC_ADDR_UN(&locAddr));
+      DBG("creating temporary send socket bound to `%s'.\n", buff);
+    } while ((fd = brpc_socket(&locAddr, /*blocking*/false, 
+        /*named*/true)) < 0);
+    /* TODO: permission, UID/GID of the created socket */
+  } else {
+    if ((fd = brpc_socket(&txAddr, /*blk*/false, /*named*/false)) < 0) {
+      ERROR("failed to create BINRPC socket: %s [%d].\n", brpc_strerror(),
+        brpc_errno);
+      fd = -1;
+      goto end;
+    }
+  }
+  if (! brpc_connect(&txAddr, &fd, ct_timeout)) {
+    ERROR("failed to connect to SER: %s [%d].\n", brpc_strerror(), brpc_errno);
+    close(fd);
+    fd = -1;
+  }
+
+end:
+  if (fd < 0) {
+    SAFE(size --);
+  } else {
+    if (BRPC_ADDR_DOMAIN(&txAddr) == PF_LOCAL) {
+      // connect succeeded -> do the mapping
+      SAFE(locAddrMap[fd] = locAddr);
+    }
+    DBG("connection FD#%d created.\n", fd);
+  }
+  return fd;
+}

Added: trunk/core/plug-in/binrpcctrl/ConnPool.h
===================================================================
--- trunk/core/plug-in/binrpcctrl/ConnPool.h    2008-08-24 17:07:26 UTC (rev 
1075)
+++ trunk/core/plug-in/binrpcctrl/ConnPool.h    2008-08-25 15:59:24 UTC (rev 
1076)
@@ -0,0 +1,38 @@
+#ifndef __CONNPOOL_H__
+#define __CONNPOOL_H__
+
+#include <pthread.h>
+#include <string>
+#include <map>
+#include <stack>
+#include <binrpc.h>
+
+using namespace std;
+using std::map;
+using std::stack;
+
+class ConnPool
+{
+    int cap; // ..acity: maximum size of the pool
+    stack<int> fdStack; // array with socket file descriptors
+    int size; // active connections count; =fdStack.size() + N x get()'s
+    map<int, brpc_addr_t> locAddrMap; // used for PF_LOCAL connections
+    pthread_mutex_t mutex; // protects access to stack with FDs
+    pthread_cond_t cond; // wait when pool empty and capacity reached
+    int onwait; // how many workers are waiting on cond.
+    brpc_tv_t ct_timeout;
+
+    int new_conn();
+
+  public:
+    ConnPool(const string &target, unsigned size, brpc_tv_t ct_timeout);
+    ~ConnPool();
+
+    brpc_addr_t txAddr; // binRPC address of SER;
+
+    int get();
+    void release(int fd);
+    void destroy(int fd);
+};
+
+#endif /* __CONNPOOL_H__ */

Added: trunk/core/plug-in/binrpcctrl/CtrlServer.cpp
===================================================================
--- trunk/core/plug-in/binrpcctrl/CtrlServer.cpp        2008-08-24 17:07:26 UTC 
(rev 1075)
+++ trunk/core/plug-in/binrpcctrl/CtrlServer.cpp        2008-08-25 15:59:24 UTC 
(rev 1076)
@@ -0,0 +1,117 @@
+
+#include "AmUtils.h"
+#include "log.h"
+#include "CtrlServer.h"
+
+CtrlServer::CtrlServer(const string &listen, unsigned listeners,
+    brpc_tv_t rx_timeout, brpc_tv_t tx_timeout) : 
+  wcnt(listeners)
+{
+  brpc_addr_t *addr;
+
+  if (! (addr = brpc_parse_uri(listen.c_str()))) {
+    throw "failed to parse BINRPC URI `" + listen + "': " + 
+        string(brpc_strerror()) + " [" + int2str(brpc_errno) + "]";
+  } else if (BRPC_ADDR_TYPE(addr) != SOCK_DGRAM) {
+    //b/c we'd have to do connection management otherwise (listen for
+    //connections, poll for activity on each descriptor etc); for now, not
+    //relly needed and the impl. is much easier.
+    throw "only datagram listeners supported";
+  } else {
+    rxAddr = *addr;
+  }
+
+  if ((rxFd = brpc_socket(addr, /*blocking*/false, /*bind*/true)) < 0) 
+    throw "failed to get listen socket for URI `" + listen + "': " + 
+        string(brpc_strerror()) + " [" + int2str(brpc_errno) + "].\n";
+
+  workers = new CtrlWorker[wcnt]();
+
+  for (unsigned i = 0; i < wcnt; i ++)
+    workers[i].init(rxFd, rxAddr, rx_timeout, tx_timeout);
+}
+
+CtrlServer::~CtrlServer()
+{
+  INFO("closing SEMS listener FD#%d for %s.\n", rxFd, 
+      brpc_print_addr(&rxAddr));
+
+  if (close(rxFd) < 0)
+    ERROR("CtrlServer server socket#%d closed uncleanly: %s [%d].\n", rxFd, 
+        strerror(errno), errno);
+  
+  if (BRPC_ADDR_DOMAIN(&rxAddr) == PF_LOCAL) {
+    if (unlink(BRPC_ADDR_UN(&rxAddr)->sun_path) < 0) {
+      ERROR("failed to remove unix socket file '%s': %s [%d].\n", 
+        BRPC_ADDR_UN(&rxAddr)->sun_path, strerror(errno), errno);
+    }
+  }
+  delete []workers;
+}
+
+void CtrlServer::start()
+{
+  for (unsigned i = 0; i < wcnt; i ++)
+    workers[i].start();
+  INFO("CtrlServer started.\n");
+}
+
+void CtrlServer::stop()
+{
+  INFO("CtrlServer stopping.\n");
+  for (unsigned i = 0; i < wcnt; i ++)
+    workers[i].stop();
+}
+
+void CtrlServer::join()
+{
+  for (unsigned i = 0; i < wcnt; i ++)
+    workers[i].join();
+  INFO("CtrlServer stopped.\n");
+}
+
+
+CtrlWorker::CtrlWorker() :
+  rxFd(-1)
+{}
+
+void CtrlWorker::init(int rxFd, brpc_addr_t rxAddr, 
+    brpc_tv_t rx_timeout, brpc_tv_t tx_timeout)
+{
+  this->rxFd = rxFd;
+  this->rxAddr = rxAddr;
+  this->rx_timeout = rx_timeout;
+  this->tx_timeout = tx_timeout;
+}
+
+void CtrlWorker::run()
+{
+  brpc_addr_t from;
+  brpc_t *req, *rpl;
+
+  INFO("CtrlServer worker #%lx started.\n", pthread_self());
+  running = 1;
+
+  do {
+    from = rxAddr; // avoid a syscall to find out socket type
+    if (! (req = brpc_recvfrom(rxFd, &from, rx_timeout)))
+      continue;
+    //unsafe
+    DBG("received BINRPC request `%.*s'.\n", BRPC_STR_FMT(brpc_method(req)));
+    if ((rpl = brpc_cb_run(req))) {
+      if (! brpc_sendto(rxFd, &from, rpl, tx_timeout)) {
+        ERROR("failed to send reply to BINRPC request: %s [%d].\n",
+          brpc_strerror(), brpc_errno);
+      }
+      brpc_finish(rpl);
+    }
+    brpc_finish(req);
+  } while (running);
+  
+  INFO("CtrlServer worker #%lx stopped.\n", pthread_self());
+}
+
+void CtrlWorker::on_stop()
+{
+  running = 0;
+}

Added: trunk/core/plug-in/binrpcctrl/CtrlServer.h
===================================================================
--- trunk/core/plug-in/binrpcctrl/CtrlServer.h  2008-08-24 17:07:26 UTC (rev 
1075)
+++ trunk/core/plug-in/binrpcctrl/CtrlServer.h  2008-08-25 15:59:24 UTC (rev 
1076)
@@ -0,0 +1,48 @@
+#ifndef __CTRLSERVER_H__
+#define __CTRLSERVER_H__
+
+#include <string>
+#include <binrpc.h>
+#include "AmThread.h"
+
+using namespace std;
+
+class CtrlWorker : public AmThread
+{
+    volatile int running; //don't like the shared var of AmThread (locking)
+    brpc_tv_t tx_timeout, rx_timeout;
+    int rxFd;
+    brpc_addr_t rxAddr;
+
+    void run();
+    void on_stop();
+
+  public:
+    CtrlWorker();
+
+    void init(int rxFd, brpc_addr_t rxAddr, 
+        brpc_tv_t rx_timeout, brpc_tv_t tx_timeout);
+};
+
+/**
+ * This is just a proxy class, handling multiple receiver threads
+ */
+class CtrlServer
+{
+    int rxFd;
+    CtrlWorker *workers;
+    unsigned wcnt;
+
+  public:
+    CtrlServer(const string &listen, unsigned listeners, 
+        brpc_tv_t rx_timeout, brpc_tv_t tx_timeout);
+    ~CtrlServer();
+
+    brpc_addr_t rxAddr;
+
+    void start();
+    void stop();
+    void join();
+};
+
+#endif /* __CTRLSERVER_H__ */

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to