On Tue, 2012-07-24 at 22:34 +0000, Hefty, Sean wrote:
> Provide limited support for applications that call fork().  To
> handle fork(), we establish connections using normal sockets.
> The socket is later converted to an rsocket when the user
> makes the first call to a data transfer function (e.g. send,
> recv, read, write, etc.).
> 
> Fork support is indicated by setting the environment variable
> RDMAV_FORK_SAFE = 1.  When set, the preload library will delay
> converting to an rsocket until the user attempts to send or receive
> data on the socket.  To convert from a normal socket to an
> rsocket, the preload library must inject a message on the
> normal socket to synchronize between the client and server.  As
> a result, if the rsocket connection fails, the ability to
> silently fallback to the normal socket may be compromised.  Fork
> support is disabled by default.
> 
> The current implementation works for simple test apps under
> ideal conditions.  Although it supports nonblocking sockets, it
> uses blocking rsockets when migrating connections.
> 

Sean,

Have you tried this with netperf?
Can we now run a single netserver and have multiple netperf clients
talking to the forked netserver child processes in parallel?

Thanks
Sridhar

> Signed-off-by: Sean Hefty <[email protected]>
> ---
>  src/preload.c |  183 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>  1 files changed, 166 insertions(+), 17 deletions(-)
> 
> diff --git a/src/preload.c b/src/preload.c
> index d2058e2..e88c958 100644
> --- a/src/preload.c
> +++ b/src/preload.c
> @@ -46,6 +46,8 @@
>  #include <string.h>
>  #include <netinet/in.h>
>  #include <netinet/tcp.h>
> +#include <unistd.h>
> +#include <semaphore.h>
> 
>  #include <rdma/rdma_cma.h>
>  #include <rdma/rdma_verbs.h>
> @@ -92,10 +94,12 @@ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
>  static int sq_size;
>  static int rq_size;
>  static int sq_inline;
> +static int fork_support;
> 
>  enum fd_type {
>       fd_normal,
> -     fd_rsocket
> +     fd_rsocket,
> +     fd_fork
>  };
> 
>  struct fd_info {
> @@ -207,6 +211,10 @@ void getenv_options(void)
>       var = getenv("RS_INLINE");
>       if (var)
>               sq_inline = atoi(var);
> +
> +     var = getenv("RDMAV_FORK_SAFE");
> +     if (var)
> +             fork_support = atoi(var);
>  }
> 
>  static void init_preload(void)
> @@ -378,8 +386,16 @@ int socket(int domain, int type, int protocol)
>       ret = rsocket(domain, type, protocol);
>       recursive = 0;
>       if (ret >= 0) {
> -             fd_store(index, ret, fd_rsocket);
> -             set_rsocket_options(ret);
> +             if (fork_support) {
> +                     rclose(ret);
> +                     ret = real.socket(domain, type, protocol);
> +                     if (ret < 0)
> +                             return ret;
> +                     fd_store(index, ret, fd_fork);
> +             } else {
> +                     fd_store(index, ret, fd_rsocket);
> +                     set_rsocket_options(ret);
> +             }
>               return index;
>       }
>       fd_close(index, &ret);
> @@ -418,31 +434,161 @@ int listen(int socket, int backlog)
>  int accept(int socket, struct sockaddr *addr, socklen_t *addrlen)
>  {
>       int fd, index, ret;
> +     enum fd_type type;
> 
> -     if (fd_get(socket, &fd) == fd_rsocket) {
> +     type = fd_get(socket, &fd);
> +     if (type == fd_rsocket || type == fd_fork) {
>               index = fd_open();
>               if (index < 0)
>                       return index;
> 
> -             ret = raccept(fd, addr, addrlen);
> +             ret = (type == fd_rsocket) ? raccept(fd, addr, addrlen) :
> +                                          real.accept(fd, addr, addrlen);
>               if (ret < 0) {
>                       fd_close(index, &fd);
>                       return ret;
>               }
> 
> -             fd_store(index, ret, fd_rsocket);
> +             fd_store(index, ret, type);
>               return index;
>       } else {
>               return real.accept(fd, addr, addrlen);
>       }
>  }
> 
> +/*
> + * We can't fork RDMA connections and pass them from the parent to the child
> + * process.  Instead, we need to establish the RDMA connection after calling
> + * fork.  To do this, we delay establishing the RDMA connection until we try
> + * to send/receive on the server side.  On the client side, we don't expect
> + * to fork, so we switch from a TCP connection to an rsocket when connecting.
> + */
> +static int fork_active(int socket, const struct sockaddr *addr, socklen_t 
> addrlen)
> +{
> +     int fd, ret;
> +     uint32_t msg;
> +     long flags;
> +
> +     fd = fd_getd(socket);
> +     flags = real.fcntl(fd, F_GETFL);
> +     real.fcntl(fd, F_SETFL, 0);
> +     ret = real.connect(fd, addr, addrlen);
> +     if (ret)
> +             return ret;
> +
> +     ret = real.recv(fd, &msg, sizeof msg, MSG_PEEK);
> +     if ((ret != sizeof msg) || msg) {
> +             fd_store(socket, fd, fd_normal);
> +             return 0;
> +     }
> +
> +     real.fcntl(fd, F_SETFL, flags);
> +     ret = transpose_socket(socket, fd_rsocket);
> +     if (ret < 0)
> +             return ret;
> +
> +     real.close(fd);
> +     return rconnect(ret, addr, addrlen);
> +}
> +
> +static void fork_passive(int socket)
> +{
> +     struct sockaddr_in6 sin6;
> +     sem_t *sem;
> +     int lfd, sfd, dfd, ret, param;
> +     socklen_t len;
> +     uint32_t msg;
> +
> +     fd_get(socket, &sfd);
> +
> +     len = sizeof sin6;
> +     ret = real.getsockname(sfd, (struct sockaddr *) &sin6, &len);
> +     if (ret)
> +             goto out;
> +     sin6.sin6_flowinfo = sin6.sin6_scope_id = 0;
> +     memset(&sin6.sin6_addr, 0, sizeof sin6.sin6_addr);
> +
> +     sem = sem_open("/rsocket_fork", O_CREAT | O_RDWR,
> +                    S_IRWXU | S_IRWXG, 1);
> +     if (sem == SEM_FAILED) {
> +             ret = -1;
> +             goto out;
> +     }
> +
> +     lfd = rsocket(sin6.sin6_family, SOCK_STREAM, 0);
> +     if (lfd < 0) {
> +             ret  = lfd;
> +             goto sclose;
> +     }
> +
> +     param = 1;
> +     rsetsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &param, sizeof param);
> +
> +     sem_wait(sem);
> +     ret = rbind(lfd, (struct sockaddr *) &sin6, sizeof sin6);
> +     if (ret)
> +             goto lclose;
> +
> +     ret = rlisten(lfd, 1);
> +     if (ret)
> +             goto lclose;
> +
> +     msg = 0;
> +     len = real.write(sfd, &msg, sizeof msg);
> +     if (len != sizeof msg)
> +             goto lclose;
> +
> +     dfd = raccept(lfd, NULL, NULL);
> +     if (dfd < 0) {
> +             ret  = dfd;
> +             goto lclose;
> +     }
> +
> +     param = 1;
> +     rsetsockopt(dfd, IPPROTO_TCP, TCP_NODELAY, &param, sizeof param);
> +     set_rsocket_options(dfd);
> +
> +     copysockopts(dfd, sfd, &rs, &real);
> +     real.shutdown(sfd, SHUT_RDWR);
> +     real.close(sfd);
> +     fd_store(socket, dfd, fd_rsocket);
> +
> +lclose:
> +     rclose(lfd);
> +     sem_post(sem);
> +sclose:
> +     sem_close(sem);
> +out:
> +     if (ret)
> +             fd_store(socket, sfd, fd_normal);
> +}
> +
> +static inline enum fd_type fd_fork_get(int index, int *fd)
> +{
> +     struct fd_info *fdi;
> +
> +     fdi = idm_lookup(&idm, index);
> +     if (fdi) {
> +             if (fdi->type == fd_fork)
> +                     fork_passive(index);
> +             *fd = fdi->fd;
> +             return fdi->type;
> +
> +     } else {
> +             *fd = index;
> +             return fd_normal;
> +     }
> +}
> +
>  int connect(int socket, const struct sockaddr *addr, socklen_t addrlen)
>  {
>       struct sockaddr_in *sin;
>       int fd, ret;
> 
> -     if (fd_get(socket, &fd) == fd_rsocket) {
> +     switch (fd_get(socket, &fd)) {
> +     case fd_fork:
> +             return fork_active(socket, addr, addrlen);
> +     case fd_rsocket:
>               sin = (struct sockaddr_in *) addr;
>               if (ntohs(sin->sin_port) > 1024) {
>                       ret = rconnect(fd, addr, addrlen);
> @@ -456,6 +602,9 @@ int connect(int socket, const struct sockaddr *addr, 
> socklen_t addrlen)
> 
>               rclose(fd);
>               fd = ret;
> +             break;
> +     default:
> +             break;
>       }
> 
>       return real.connect(fd, addr, addrlen);
> @@ -464,7 +613,7 @@ int connect(int socket, const struct sockaddr *addr, 
> socklen_t addrlen)
>  ssize_t recv(int socket, void *buf, size_t len, int flags)
>  {
>       int fd;
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rrecv(fd, buf, len, flags) : real.recv(fd, buf, len, flags);
>  }
> 
> @@ -472,7 +621,7 @@ ssize_t recvfrom(int socket, void *buf, size_t len, int 
> flags,
>                struct sockaddr *src_addr, socklen_t *addrlen)
>  {
>       int fd;
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rrecvfrom(fd, buf, len, flags, src_addr, addrlen) :
>               real.recvfrom(fd, buf, len, flags, src_addr, addrlen);
>  }
> @@ -480,7 +629,7 @@ ssize_t recvfrom(int socket, void *buf, size_t len, int 
> flags,
>  ssize_t recvmsg(int socket, struct msghdr *msg, int flags)
>  {
>       int fd;
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rrecvmsg(fd, msg, flags) : real.recvmsg(fd, msg, flags);
>  }
> 
> @@ -488,7 +637,7 @@ ssize_t read(int socket, void *buf, size_t count)
>  {
>       int fd;
>       init_preload();
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rread(fd, buf, count) : real.read(fd, buf, count);
>  }
> 
> @@ -496,14 +645,14 @@ ssize_t readv(int socket, const struct iovec *iov, int 
> iovcnt)
>  {
>       int fd;
>       init_preload();
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rreadv(fd, iov, iovcnt) : real.readv(fd, iov, iovcnt);
>  }
> 
>  ssize_t send(int socket, const void *buf, size_t len, int flags)
>  {
>       int fd;
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rsend(fd, buf, len, flags) : real.send(fd, buf, len, flags);
>  }
> 
> @@ -511,7 +660,7 @@ ssize_t sendto(int socket, const void *buf, size_t len, 
> int flags,
>               const struct sockaddr *dest_addr, socklen_t addrlen)
>  {
>       int fd;
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rsendto(fd, buf, len, flags, dest_addr, addrlen) :
>               real.sendto(fd, buf, len, flags, dest_addr, addrlen);
>  }
> @@ -519,7 +668,7 @@ ssize_t sendto(int socket, const void *buf, size_t len, 
> int flags,
>  ssize_t sendmsg(int socket, const struct msghdr *msg, int flags)
>  {
>       int fd;
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rsendmsg(fd, msg, flags) : real.sendmsg(fd, msg, flags);
>  }
> 
> @@ -527,7 +676,7 @@ ssize_t write(int socket, const void *buf, size_t count)
>  {
>       int fd;
>       init_preload();
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rwrite(fd, buf, count) : real.write(fd, buf, count);
>  }
> 
> @@ -535,7 +684,7 @@ ssize_t writev(int socket, const struct iovec *iov, int 
> iovcnt)
>  {
>       int fd;
>       init_preload();
> -     return (fd_get(socket, &fd) == fd_rsocket) ?
> +     return (fd_fork_get(socket, &fd) == fd_rsocket) ?
>               rwritev(fd, iov, iovcnt) : real.writev(fd, iov, iovcnt);
>  }
> 
> 
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 


--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to