From: Liu Yuan <[email protected]>

for sheep itself, sd as a file name means nothing. We can have a more consistent
file naming with request.c

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/Makefile.am |    2 +-
 sheep/request.c   |  878 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/sdnet.c     |  878 -----------------------------------------------------
 3 files changed, 879 insertions(+), 879 deletions(-)
 create mode 100644 sheep/request.c
 delete mode 100644 sheep/sdnet.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index f8017b4..16c79f0 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -24,7 +24,7 @@ INCLUDES              = -I$(top_builddir)/include 
-I$(top_srcdir)/include \
 
 sbin_PROGRAMS          = sheep
 
-sheep_SOURCES          = sheep.c group.c sdnet.c gateway.c store.c vdi.c 
work.c \
+sheep_SOURCES          = sheep.c group.c request.c gateway.c store.c vdi.c 
work.c \
                          journal.c ops.c recovery.c cluster/local.c \
                          object_cache.c object_list_cache.c sockfd_cache.c
 
diff --git a/sheep/request.c b/sheep/request.c
new file mode 100644
index 0000000..35ac488
--- /dev/null
+++ b/sheep/request.c
@@ -0,0 +1,878 @@
+/*
+ * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <sys/epoll.h>
+#include <fcntl.h>
+
+#include "sheep_priv.h"
+
+static void requeue_request(struct request *req);
+
+static int is_access_local(struct request *req, uint64_t oid)
+{
+       struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
+       int nr_copies;
+       int i;
+
+       nr_copies = get_nr_copies(req->vnodes);
+       oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
+
+       for (i = 0; i < nr_copies; i++) {
+               if (vnode_is_local(obj_vnodes[i]))
+                       return 1;
+       }
+
+       return 0;
+}
+
+static void io_op_done(struct work *work)
+{
+       struct request *req = container_of(work, struct request, work);
+
+       if (req->rp.result == SD_RES_EIO) {
+               req->rp.result = SD_RES_NETWORK_ERROR;
+
+               eprintf("leaving sheepdog cluster\n");
+               leave_cluster();
+       }
+
+       put_request(req);
+       return;
+}
+
+static void gateway_op_done(struct work *work)
+{
+       struct request *req = container_of(work, struct request, work);
+       struct sd_req *hdr = &req->rq;
+
+       switch (req->rp.result) {
+       case SD_RES_OLD_NODE_VER:
+               if (req->rp.epoch > sys->epoch) {
+                       list_add_tail(&req->request_list,
+                                     &sys->wait_rw_queue);
+                       /*
+                        * Gateway of this node is expected to process this
+                        * request later when epoch is lifted.
+                        */
+                       return;
+               }
+               /*FALLTHRU*/
+       case SD_RES_NEW_NODE_VER:
+       case SD_RES_NETWORK_ERROR:
+       case SD_RES_WAIT_FOR_JOIN:
+       case SD_RES_WAIT_FOR_FORMAT:
+               dprintf("retrying failed I/O request "
+                       "op %s result %d epoch %d, sys epoch %d\n",
+                       op_name(req->op),
+                       req->rp.result,
+                       req->rq.epoch,
+                       sys->epoch);
+               goto retry;
+       case SD_RES_EIO:
+               if (is_access_local(req, hdr->obj.oid)) {
+                       eprintf("leaving sheepdog cluster\n");
+                       leave_cluster();
+                       goto retry;
+               }
+               break;
+       case SD_RES_SUCCESS:
+               break;
+       }
+
+       put_request(req);
+       return;
+retry:
+       requeue_request(req);
+}
+
+static void local_op_done(struct work *work)
+{
+       struct request *req = container_of(work, struct request, work);
+
+       if (has_process_main(req->op)) {
+               req->rp.result = do_process_main(req->op, &req->rq,
+                                                &req->rp, req->data);
+       }
+
+       put_request(req);
+}
+
+static int check_request_epoch(struct request *req)
+{
+       if (before(req->rq.epoch, sys->epoch)) {
+               eprintf("old node version %u, %u (%s)\n",
+                       sys->epoch, req->rq.epoch, op_name(req->op));
+               /* ask gateway to retry. */
+               req->rp.result = SD_RES_OLD_NODE_VER;
+               req->rp.epoch = sys->epoch;
+               put_request(req);
+               return -1;
+       } else if (after(req->rq.epoch, sys->epoch)) {
+               eprintf("new node version %u, %u (%s)\n",
+                       sys->epoch, req->rq.epoch, op_name(req->op));
+
+               /* put on local wait queue, waiting for local epoch
+                  to be lifted */
+               req->rp.result = SD_RES_NEW_NODE_VER;
+               list_add_tail(&req->request_list, &sys->wait_rw_queue);
+               return -1;
+       }
+
+       return 0;
+}
+
+static bool request_in_recovery(struct request *req)
+{
+       /*
+        * Request from recovery should go down the Farm even if
+        * oid_in_recovery() returns true because we should also try snap
+        * cache of the Farm and return the error code back if not found.
+        */
+       if (oid_in_recovery(req->local_oid) &&
+           !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
+               /*
+                * Put request on wait queues of local node
+                */
+               if (is_recovery_init()) {
+                       req->rp.result = SD_RES_OBJ_RECOVERING;
+                       list_add_tail(&req->request_list,
+                                     &sys->wait_rw_queue);
+               } else {
+                       list_add_tail(&req->request_list,
+                                     &sys->wait_obj_queue);
+               }
+               return true;
+       }
+       return false;
+}
+
+void resume_wait_epoch_requests(void)
+{
+       struct request *req, *t;
+       LIST_HEAD(pending_list);
+
+       list_splice_init(&sys->wait_rw_queue, &pending_list);
+
+       list_for_each_entry_safe(req, t, &pending_list, request_list) {
+               switch (req->rp.result) {
+               case SD_RES_OLD_NODE_VER:
+                       /*
+                        * Gateway retries to send the request when
+                        * its epoch changes.
+                        */
+                       assert(is_gateway_op(req->op));
+                       req->rq.epoch = sys->epoch;
+                       list_del(&req->request_list);
+                       requeue_request(req);
+                       break;
+               case SD_RES_NEW_NODE_VER:
+                       /* Peer retries the request locally when its epoch 
changes. */
+                       assert(!is_gateway_op(req->op));
+                       list_del(&req->request_list);
+                       requeue_request(req);
+                       break;
+               default:
+                       break;
+               }
+       }
+
+       list_splice_init(&pending_list, &sys->wait_rw_queue);
+}
+
+void resume_wait_recovery_requests(void)
+{
+       struct request *req, *t;
+       LIST_HEAD(pending_list);
+
+       list_splice_init(&sys->wait_rw_queue, &pending_list);
+
+       list_for_each_entry_safe(req, t, &pending_list, request_list) {
+               if (req->rp.result != SD_RES_OBJ_RECOVERING)
+                       continue;
+
+               dprintf("resume wait oid %" PRIx64 "\n", req->local_oid);
+               list_del(&req->request_list);
+               requeue_request(req);
+       }
+
+       list_splice_init(&pending_list, &sys->wait_rw_queue);
+}
+
+void resume_wait_obj_requests(uint64_t oid)
+{
+       struct request *req, *t;
+       LIST_HEAD(pending_list);
+
+       list_splice_init(&sys->wait_obj_queue, &pending_list);
+
+       list_for_each_entry_safe(req, t, &pending_list, request_list) {
+               if (req->local_oid != oid)
+                       continue;
+
+               /* the object requested by a pending request has been
+                * recovered, notify the pending request. */
+               dprintf("retry %" PRIx64 "\n", req->local_oid);
+               list_del(&req->request_list);
+               requeue_request(req);
+       }
+       list_splice_init(&pending_list, &sys->wait_obj_queue);
+}
+
+void flush_wait_obj_requests(void)
+{
+       struct request *req, *n;
+       LIST_HEAD(pending_list);
+
+       list_splice_init(&sys->wait_obj_queue, &pending_list);
+
+       list_for_each_entry_safe(req, n, &pending_list, request_list) {
+               list_del(&req->request_list);
+               requeue_request(req);
+       }
+}
+
+static void queue_peer_request(struct request *req)
+{
+       req->local_oid = req->rq.obj.oid;
+       if (req->local_oid) {
+               if (check_request_epoch(req) < 0)
+                       return;
+               if (request_in_recovery(req))
+                       return;
+       }
+
+       if (req->rq.flags & SD_FLAG_CMD_RECOVERY)
+               req->rq.epoch = req->rq.obj.tgt_epoch;
+
+       req->work.fn = do_process_work;
+       req->work.done = io_op_done;
+       queue_work(sys->io_wqueue, &req->work);
+}
+
+static void queue_gateway_request(struct request *req)
+{
+       struct sd_req *hdr = &req->rq;
+
+       if (is_access_local(req, hdr->obj.oid))
+               req->local_oid = hdr->obj.oid;
+
+       /*
+        * If we go for a cached object, we don't care if it is being recovered
+        */
+       if (sys->enable_write_cache &&
+           req->rq.flags & SD_FLAG_CMD_CACHE &&
+           object_is_cached(req->rq.obj.oid))
+               goto queue_work;
+
+       if (req->local_oid)
+               if (request_in_recovery(req))
+                       return;
+
+queue_work:
+       req->work.fn = do_process_work;
+       req->work.done = gateway_op_done;
+       queue_work(sys->gateway_wqueue, &req->work);
+}
+
+static void queue_local_request(struct request *req)
+{
+       req->work.fn = do_process_work;
+       req->work.done = local_op_done;
+       queue_work(sys->io_wqueue, &req->work);
+}
+
+static void queue_request(struct request *req)
+{
+       struct sd_req *hdr = &req->rq;
+       struct sd_rsp *rsp = &req->rp;
+
+       /*
+        * Check the protocol version for all internal commands, and public
+        * commands that have it set.  We can't enforce it on all public
+        * ones as it isn't a mandatory part of the public protocol.
+        */
+       if (hdr->opcode >= 0x80) {
+               if (hdr->proto_ver != SD_SHEEP_PROTO_VER) {
+                       rsp->result = SD_RES_VER_MISMATCH;
+                       goto done;
+               }
+       } else if (hdr->proto_ver) {
+               if (hdr->proto_ver != SD_PROTO_VER) {
+                       rsp->result = SD_RES_VER_MISMATCH;
+                       goto done;
+               }
+       }
+
+       req->op = get_sd_op(hdr->opcode);
+       if (!req->op) {
+               eprintf("invalid opcode %d\n", hdr->opcode);
+               rsp->result = SD_RES_INVALID_PARMS;
+               goto done;
+       }
+
+       dprintf("%s\n", op_name(req->op));
+
+       switch (sys->status) {
+       case SD_STATUS_SHUTDOWN:
+               rsp->result = SD_RES_SHUTDOWN;
+               goto done;
+       case SD_STATUS_WAIT_FOR_FORMAT:
+               if (!is_force_op(req->op)) {
+                       rsp->result = SD_RES_WAIT_FOR_FORMAT;
+                       goto done;
+               }
+               break;
+       case SD_STATUS_WAIT_FOR_JOIN:
+               if (!is_force_op(req->op)) {
+                       rsp->result = SD_RES_WAIT_FOR_JOIN;
+                       goto done;
+               }
+               break;
+       case SD_STATUS_HALT:
+               if (!is_force_op(req->op)) {
+                       rsp->result = SD_RES_HALT;
+                       goto done;
+               }
+               break;
+       default:
+               break;
+       }
+
+       /*
+        * force operations shouldn't access req->vnodes in their
+        * process_work() and process_main() because they can be
+        * called before we set up current_vnode_info
+        */
+       if (!is_force_op(req->op))
+               req->vnodes = get_vnode_info();
+
+       if (is_peer_op(req->op)) {
+               queue_peer_request(req);
+       } else if (is_gateway_op(req->op)) {
+               hdr->epoch = sys->epoch;
+               queue_gateway_request(req);
+       } else if (is_local_op(req->op)) {
+               hdr->epoch = sys->epoch;
+               queue_local_request(req);
+       } else if (is_cluster_op(req->op)) {
+               hdr->epoch = sys->epoch;
+               queue_cluster_request(req);
+       } else {
+               eprintf("unknown operation %d\n", hdr->opcode);
+               rsp->result = SD_RES_SYSTEM_ERROR;
+               goto done;
+       }
+
+       return;
+done:
+       put_request(req);
+}
+
+static void requeue_request(struct request *req)
+{
+       if (req->vnodes)
+               put_vnode_info(req->vnodes);
+       queue_request(req);
+}
+
+static void clear_client_info(struct client_info *ci);
+
+static struct request *alloc_local_request(void *data, int data_length)
+{
+       struct request *req;
+
+       req = xzalloc(sizeof(struct request));
+       if (data_length) {
+               req->data_length = data_length;
+               req->data = data;
+       }
+
+       req->local = 1;
+
+       INIT_LIST_HEAD(&req->request_list);
+
+       return req;
+}
+
+/*
+ * Exec the request locally and synchronously.
+ *
+ * This function takes advantage of gateway's retry mechanism.
+ */
+int exec_local_req(struct sd_req *rq, void *data)
+{
+       struct request *req;
+       eventfd_t value = 1;
+       int ret;
+
+       req = alloc_local_request(data, rq->data_length);
+       req->rq = *rq;
+       req->wait_efd = eventfd(0, 0);
+
+       pthread_mutex_lock(&sys->wait_req_lock);
+       list_add_tail(&req->request_list, &sys->wait_req_queue);
+       pthread_mutex_unlock(&sys->wait_req_lock);
+
+       eventfd_write(sys->req_efd, value);
+
+       ret = eventfd_read(req->wait_efd, &value);
+       if (ret < 0)
+               eprintf("event fd read error %m");
+
+       close(req->wait_efd);
+       ret = req->rp.result;
+       free(req);
+
+       return ret;
+}
+
+static struct request *alloc_request(struct client_info *ci, int data_length)
+{
+       struct request *req;
+
+       req = zalloc(sizeof(struct request));
+       if (!req)
+               return NULL;
+
+       req->ci = ci;
+       ci->refcnt++;
+       if (data_length) {
+               req->data_length = data_length;
+               req->data = valloc(data_length);
+               if (!req->data) {
+                       free(req);
+                       return NULL;
+               }
+       }
+
+       INIT_LIST_HEAD(&req->request_list);
+       uatomic_set(&req->refcnt, 1);
+
+       uatomic_inc(&sys->nr_outstanding_reqs);
+
+       return req;
+}
+
+static void free_request(struct request *req)
+{
+       uatomic_dec(&sys->nr_outstanding_reqs);
+
+       req->ci->refcnt--;
+       put_vnode_info(req->vnodes);
+       free(req->data);
+       free(req);
+}
+
+void put_request(struct request *req)
+{
+       struct client_info *ci = req->ci;
+       eventfd_t value = 1;
+
+       if (uatomic_sub_return(&req->refcnt, 1) > 0)
+               return;
+
+       if (req->local) {
+               req->done = 1;
+               eventfd_write(req->wait_efd, value);
+       } else {
+               if (conn_tx_on(&ci->conn)) {
+                       clear_client_info(ci);
+                       free_request(req);
+               } else {
+                       list_add(&req->request_list, &ci->done_reqs);
+               }
+       }
+}
+
+static void init_rx_hdr(struct client_info *ci)
+{
+       ci->conn.c_rx_state = C_IO_HEADER;
+       ci->rx_req = NULL;
+       ci->conn.rx_length = sizeof(struct sd_req);
+       ci->conn.rx_buf = &ci->conn.rx_hdr;
+}
+
+static inline int begin_rx(struct client_info *ci)
+{
+       int ret;
+       uint64_t data_len;
+       struct connection *conn = &ci->conn;
+       struct sd_req *hdr = &conn->rx_hdr;
+       struct request *req;
+
+       switch (conn->c_rx_state) {
+       case C_IO_HEADER:
+               ret = rx(conn, C_IO_DATA_INIT);
+               if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
+                       break;
+       case C_IO_DATA_INIT:
+               data_len = hdr->data_length;
+
+               req = alloc_request(ci, data_len);
+               if (!req) {
+                       conn->c_rx_state = C_IO_CLOSED;
+                       break;
+               }
+               ci->rx_req = req;
+
+               /* use le_to_cpu */
+               memcpy(&req->rq, hdr, sizeof(req->rq));
+
+               if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
+                       conn->c_rx_state = C_IO_DATA;
+                       conn->rx_length = data_len;
+                       conn->rx_buf = req->data;
+               } else {
+                       conn->c_rx_state = C_IO_END;
+                       break;
+               }
+       case C_IO_DATA:
+               ret = rx(conn, C_IO_END);
+               break;
+       default:
+               eprintf("bug: unknown state %d\n", conn->c_rx_state);
+       }
+
+       if (is_conn_dead(conn)) {
+               clear_client_info(ci);
+               return -1;
+       }
+
+       /* Short read happens */
+       if (conn->c_rx_state != C_IO_END)
+               return -1;
+
+       return 0;
+}
+
+static inline void finish_rx(struct client_info *ci)
+{
+       struct request *req;
+       struct sd_req *hdr = &ci->conn.rx_hdr;
+
+       req = ci->rx_req;
+       init_rx_hdr(ci);
+       if (hdr->flags & SD_FLAG_CMD_WRITE)
+               req->rp.data_length = 0;
+       else
+               req->rp.data_length = hdr->data_length;
+
+       dprintf("%d, %s:%d\n", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
+       queue_request(req);
+}
+
+static void do_client_rx(struct client_info *ci)
+{
+       if (begin_rx(ci) < 0)
+               return;
+
+       finish_rx(ci);
+}
+
+static void init_tx_hdr(struct client_info *ci)
+{
+       struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+       struct request *req;
+
+       assert(!list_empty(&ci->done_reqs));
+
+       memset(rsp, 0, sizeof(*rsp));
+
+       req = list_first_entry(&ci->done_reqs, struct request, request_list);
+       list_del(&req->request_list);
+
+       ci->tx_req = req;
+       ci->conn.tx_length = sizeof(*rsp);
+       ci->conn.c_tx_state = C_IO_HEADER;
+       ci->conn.tx_buf = rsp;
+
+       /* use cpu_to_le */
+       memcpy(rsp, &req->rp, sizeof(*rsp));
+
+       rsp->epoch = sys->epoch;
+       rsp->opcode = req->rq.opcode;
+       rsp->id = req->rq.id;
+}
+
+static inline int begin_tx(struct client_info *ci)
+{
+       int ret, opt;
+       struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
+
+       /* If short send happens, we don't need init hdr */
+       if (!ci->tx_req)
+               init_tx_hdr(ci);
+
+       opt = 1;
+       setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+       switch (ci->conn.c_tx_state) {
+       case C_IO_HEADER:
+               ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
+               if (!ret)
+                       break;
+
+               if (rsp->data_length) {
+                       ci->conn.tx_length = rsp->data_length;
+                       ci->conn.tx_buf = ci->tx_req->data;
+                       ci->conn.c_tx_state = C_IO_DATA;
+               } else {
+                       ci->conn.c_tx_state = C_IO_END;
+                       break;
+               }
+       case C_IO_DATA:
+               ret = tx(&ci->conn, C_IO_END, 0);
+               if (!ret)
+                       break;
+       default:
+               break;
+       }
+
+       opt = 0;
+       setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+       if (is_conn_dead(&ci->conn)) {
+               clear_client_info(ci);
+               return -1;
+       }
+       return 0;
+}
+
+/* Return 1 if short send happens or we have more data to send */
+static inline int finish_tx(struct client_info *ci)
+{
+       /* Finish sending one response */
+       if (ci->conn.c_tx_state == C_IO_END) {
+               dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
+                       ci->conn.ipstr, ci->conn.port);
+               free_request(ci->tx_req);
+               ci->tx_req = NULL;
+       }
+       if (ci->tx_req || !list_empty(&ci->done_reqs))
+               return 1;
+       return 0;
+}
+
+static void do_client_tx(struct client_info *ci)
+{
+       if (list_empty(&ci->done_reqs)) {
+               if (conn_tx_off(&ci->conn))
+                       clear_client_info(ci);
+               return;
+       }
+again:
+       if (begin_tx(ci) < 0)
+               return;
+
+       if (finish_tx(ci))
+               goto again;
+
+       /* Let's go sleep, and put_request() will wake me up */
+       if (conn_tx_off(&ci->conn))
+               clear_client_info(ci);
+}
+
+static void destroy_client(struct client_info *ci)
+{
+       dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
+       close(ci->conn.fd);
+       free(ci);
+}
+
+static void clear_client_info(struct client_info *ci)
+{
+       struct request *req, *t;
+
+       dprintf("connection seems to be dead\n");
+
+       if (ci->rx_req) {
+               free_request(ci->rx_req);
+               ci->rx_req = NULL;
+       }
+
+       if (ci->tx_req) {
+               free_request(ci->tx_req);
+               ci->tx_req = NULL;
+       }
+
+       list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
+               list_del(&req->request_list);
+               free_request(req);
+       }
+
+       unregister_event(ci->conn.fd);
+
+       dprintf("refcnt:%d, fd:%d, %s:%d\n",
+               ci->refcnt, ci->conn.fd,
+               ci->conn.ipstr, ci->conn.port);
+
+       if (ci->refcnt)
+               return;
+
+       destroy_client(ci);
+}
+
+static struct client_info *create_client(int fd, struct cluster_info *cluster)
+{
+       struct client_info *ci;
+       struct sockaddr_storage from;
+       socklen_t namesize = sizeof(from);
+
+       ci = zalloc(sizeof(*ci));
+       if (!ci)
+               return NULL;
+
+       if (getpeername(fd, (struct sockaddr *)&from, &namesize))
+               return NULL;
+
+       switch (from.ss_family) {
+       case AF_INET:
+               ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
+               inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
+                               ci->conn.ipstr, sizeof(ci->conn.ipstr));
+               break;
+       case AF_INET6:
+               ci->conn.port = ntohs(((struct sockaddr_in6 
*)&from)->sin6_port);
+               inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
+                               ci->conn.ipstr, sizeof(ci->conn.ipstr));
+               break;
+       }
+
+       ci->conn.fd = fd;
+       ci->conn.events = EPOLLIN;
+       ci->refcnt = 0;
+
+       INIT_LIST_HEAD(&ci->done_reqs);
+
+       init_rx_hdr(ci);
+
+       return ci;
+}
+
+static void client_handler(int fd, int events, void *data)
+{
+       struct client_info *ci = (struct client_info *)data;
+
+       dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
+               ci->conn.c_tx_state);
+
+       if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
+               return clear_client_info(ci);
+
+       if (events & EPOLLIN)
+               do_client_rx(ci);
+
+       if (events & EPOLLOUT)
+               do_client_tx(ci);
+}
+
+static void listen_handler(int listen_fd, int events, void *data)
+{
+       struct sockaddr_storage from;
+       socklen_t namesize;
+       int fd, ret;
+       struct client_info *ci;
+
+       if (sys_stat_shutdown()) {
+               dprintf("unregistering connection %d\n", listen_fd);
+               unregister_event(listen_fd);
+               return;
+       }
+
+       namesize = sizeof(from);
+       fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
+       if (fd < 0) {
+               eprintf("failed to accept a new connection: %m\n");
+               return;
+       }
+
+       ret = set_keepalive(fd);
+       if (ret) {
+               close(fd);
+               return;
+       }
+
+       ret = set_nodelay(fd);
+       if (ret) {
+               close(fd);
+               return;
+       }
+
+       ret = set_nonblocking(fd);
+       if (ret) {
+               close(fd);
+               return;
+       }
+
+       ci = create_client(fd, data);
+       if (!ci) {
+               close(fd);
+               return;
+       }
+
+       ret = register_event(fd, client_handler, ci);
+       if (ret) {
+               destroy_client(ci);
+               return;
+       }
+
+       dprintf("accepted a new connection: %d\n", fd);
+}
+
+static int create_listen_port_fn(int fd, void *data)
+{
+       return register_event(fd, listen_handler, data);
+}
+
+int create_listen_port(int port, void *data)
+{
+       return create_listen_ports(port, create_listen_port_fn, data);
+}
+
+
+static void req_handler(int listen_fd, int events, void *data)
+{
+       eventfd_t value;
+       struct request *req, *t;
+       LIST_HEAD(pending_list);
+       int ret;
+
+       if (events & EPOLLERR)
+               eprintf("request handler error\n");
+
+       ret = eventfd_read(listen_fd, &value);
+       if (ret < 0)
+               return;
+
+       pthread_mutex_lock(&sys->wait_req_lock);
+       list_splice_init(&sys->wait_req_queue, &pending_list);
+       pthread_mutex_unlock(&sys->wait_req_lock);
+
+       list_for_each_entry_safe(req, t, &pending_list, request_list) {
+               list_del(&req->request_list);
+               queue_request(req);
+       }
+}
+
+void local_req_init(void)
+{
+       pthread_mutex_init(&sys->wait_req_lock, NULL);
+       sys->req_efd = eventfd(0, EFD_NONBLOCK);
+       register_event(sys->req_efd, req_handler, NULL);
+}
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
deleted file mode 100644
index 7b53b04..0000000
--- a/sheep/sdnet.c
+++ /dev/null
@@ -1,878 +0,0 @@
-/*
- * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <assert.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <pthread.h>
-#include <sys/eventfd.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <sys/epoll.h>
-#include <fcntl.h>
-
-#include "sheep_priv.h"
-
-static void requeue_request(struct request *req);
-
-static int is_access_local(struct request *req, uint64_t oid)
-{
-       struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
-       int nr_copies;
-       int i;
-
-       nr_copies = get_nr_copies(req->vnodes);
-       oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
-
-       for (i = 0; i < nr_copies; i++) {
-               if (vnode_is_local(obj_vnodes[i]))
-                       return 1;
-       }
-
-       return 0;
-}
-
-static void io_op_done(struct work *work)
-{
-       struct request *req = container_of(work, struct request, work);
-
-       if (req->rp.result == SD_RES_EIO) {
-               req->rp.result = SD_RES_NETWORK_ERROR;
-
-               eprintf("leaving sheepdog cluster\n");
-               leave_cluster();
-       }
-
-       put_request(req);
-       return;
-}
-
-static void gateway_op_done(struct work *work)
-{
-       struct request *req = container_of(work, struct request, work);
-       struct sd_req *hdr = &req->rq;
-
-       switch (req->rp.result) {
-       case SD_RES_OLD_NODE_VER:
-               if (req->rp.epoch > sys->epoch) {
-                       list_add_tail(&req->request_list,
-                                     &sys->wait_rw_queue);
-                       /*
-                        * Gateway of this node is expected to process this
-                        * request later when epoch is lifted.
-                        */
-                       return;
-               }
-               /*FALLTHRU*/
-       case SD_RES_NEW_NODE_VER:
-       case SD_RES_NETWORK_ERROR:
-       case SD_RES_WAIT_FOR_JOIN:
-       case SD_RES_WAIT_FOR_FORMAT:
-               dprintf("retrying failed I/O request "
-                       "op %s result %d epoch %d, sys epoch %d\n",
-                       op_name(req->op),
-                       req->rp.result,
-                       req->rq.epoch,
-                       sys->epoch);
-               goto retry;
-       case SD_RES_EIO:
-               if (is_access_local(req, hdr->obj.oid)) {
-                       eprintf("leaving sheepdog cluster\n");
-                       leave_cluster();
-                       goto retry;
-               }
-               break;
-       case SD_RES_SUCCESS:
-               break;
-       }
-
-       put_request(req);
-       return;
-retry:
-       requeue_request(req);
-}
-
-static void local_op_done(struct work *work)
-{
-       struct request *req = container_of(work, struct request, work);
-
-       if (has_process_main(req->op)) {
-               req->rp.result = do_process_main(req->op, &req->rq,
-                                                &req->rp, req->data);
-       }
-
-       put_request(req);
-}
-
-static int check_request_epoch(struct request *req)
-{
-       if (before(req->rq.epoch, sys->epoch)) {
-               eprintf("old node version %u, %u (%s)\n",
-                       sys->epoch, req->rq.epoch, op_name(req->op));
-               /* ask gateway to retry. */
-               req->rp.result = SD_RES_OLD_NODE_VER;
-               req->rp.epoch = sys->epoch;
-               put_request(req);
-               return -1;
-       } else if (after(req->rq.epoch, sys->epoch)) {
-               eprintf("new node version %u, %u (%s)\n",
-                       sys->epoch, req->rq.epoch, op_name(req->op));
-
-               /* put on local wait queue, waiting for local epoch
-                  to be lifted */
-               req->rp.result = SD_RES_NEW_NODE_VER;
-               list_add_tail(&req->request_list, &sys->wait_rw_queue);
-               return -1;
-       }
-
-       return 0;
-}
-
-static bool request_in_recovery(struct request *req)
-{
-       /*
-        * Request from recovery should go down the Farm even if
-        * oid_in_recovery() returns true because we should also try snap
-        * cache of the Farm and return the error code back if not found.
-        */
-       if (oid_in_recovery(req->local_oid) &&
-           !(req->rq.flags & SD_FLAG_CMD_RECOVERY)) {
-               /*
-                * Put request on wait queues of local node
-                */
-               if (is_recovery_init()) {
-                       req->rp.result = SD_RES_OBJ_RECOVERING;
-                       list_add_tail(&req->request_list,
-                                     &sys->wait_rw_queue);
-               } else {
-                       list_add_tail(&req->request_list,
-                                     &sys->wait_obj_queue);
-               }
-               return true;
-       }
-       return false;
-}
-
-void resume_wait_epoch_requests(void)
-{
-       struct request *req, *t;
-       LIST_HEAD(pending_list);
-
-       list_splice_init(&sys->wait_rw_queue, &pending_list);
-
-       list_for_each_entry_safe(req, t, &pending_list, request_list) {
-               switch (req->rp.result) {
-               case SD_RES_OLD_NODE_VER:
-                       /*
-                        * Gateway retries to send the request when
-                        * its epoch changes.
-                        */
-                       assert(is_gateway_op(req->op));
-                       req->rq.epoch = sys->epoch;
-                       list_del(&req->request_list);
-                       requeue_request(req);
-                       break;
-               case SD_RES_NEW_NODE_VER:
-                       /* Peer retries the request locally when its epoch 
changes. */
-                       assert(!is_gateway_op(req->op));
-                       list_del(&req->request_list);
-                       requeue_request(req);
-                       break;
-               default:
-                       break;
-               }
-       }
-
-       list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_recovery_requests(void)
-{
-       struct request *req, *t;
-       LIST_HEAD(pending_list);
-
-       list_splice_init(&sys->wait_rw_queue, &pending_list);
-
-       list_for_each_entry_safe(req, t, &pending_list, request_list) {
-               if (req->rp.result != SD_RES_OBJ_RECOVERING)
-                       continue;
-
-               dprintf("resume wait oid %" PRIx64 "\n", req->local_oid);
-               list_del(&req->request_list);
-               requeue_request(req);
-       }
-
-       list_splice_init(&pending_list, &sys->wait_rw_queue);
-}
-
-void resume_wait_obj_requests(uint64_t oid)
-{
-       struct request *req, *t;
-       LIST_HEAD(pending_list);
-
-       list_splice_init(&sys->wait_obj_queue, &pending_list);
-
-       list_for_each_entry_safe(req, t, &pending_list, request_list) {
-               if (req->local_oid != oid)
-                       continue;
-
-               /* the object requested by a pending request has been
-                * recovered, notify the pending request. */
-               dprintf("retry %" PRIx64 "\n", req->local_oid);
-               list_del(&req->request_list);
-               requeue_request(req);
-       }
-       list_splice_init(&pending_list, &sys->wait_obj_queue);
-}
-
-void flush_wait_obj_requests(void)
-{
-       struct request *req, *n;
-       LIST_HEAD(pending_list);
-
-       list_splice_init(&sys->wait_obj_queue, &pending_list);
-
-       list_for_each_entry_safe(req, n, &pending_list, request_list) {
-               list_del(&req->request_list);
-               requeue_request(req);
-       }
-}
-
-static void queue_peer_request(struct request *req)
-{
-       req->local_oid = req->rq.obj.oid;
-       if (req->local_oid) {
-               if (check_request_epoch(req) < 0)
-                       return;
-               if (request_in_recovery(req))
-                       return;
-       }
-
-       if (req->rq.flags & SD_FLAG_CMD_RECOVERY)
-               req->rq.epoch = req->rq.obj.tgt_epoch;
-
-       req->work.fn = do_process_work;
-       req->work.done = io_op_done;
-       queue_work(sys->io_wqueue, &req->work);
-}
-
-static void queue_gateway_request(struct request *req)
-{
-       struct sd_req *hdr = &req->rq;
-
-       if (is_access_local(req, hdr->obj.oid))
-               req->local_oid = hdr->obj.oid;
-
-       /*
-        * If we go for a cached object, we don't care if it is being recovered
-        */
-       if (sys->enable_write_cache &&
-           req->rq.flags & SD_FLAG_CMD_CACHE &&
-           object_is_cached(req->rq.obj.oid))
-               goto queue_work;
-
-       if (req->local_oid)
-               if (request_in_recovery(req))
-                       return;
-
-queue_work:
-       req->work.fn = do_process_work;
-       req->work.done = gateway_op_done;
-       queue_work(sys->gateway_wqueue, &req->work);
-}
-
-static void queue_local_request(struct request *req)
-{
-       req->work.fn = do_process_work;
-       req->work.done = local_op_done;
-       queue_work(sys->io_wqueue, &req->work);
-}
-
-static void queue_request(struct request *req)
-{
-       struct sd_req *hdr = &req->rq;
-       struct sd_rsp *rsp = &req->rp;
-
-       /*
-        * Check the protocol version for all internal commands, and public
-        * commands that have it set.  We can't enforce it on all public
-        * ones as it isn't a mandatory part of the public protocol.
-        */
-       if (hdr->opcode >= 0x80) {
-               if (hdr->proto_ver != SD_SHEEP_PROTO_VER) {
-                       rsp->result = SD_RES_VER_MISMATCH;
-                       goto done;
-               }
-       } else if (hdr->proto_ver) {
-               if (hdr->proto_ver != SD_PROTO_VER) {
-                       rsp->result = SD_RES_VER_MISMATCH;
-                       goto done;
-               }
-       }
-
-       req->op = get_sd_op(hdr->opcode);
-       if (!req->op) {
-               eprintf("invalid opcode %d\n", hdr->opcode);
-               rsp->result = SD_RES_INVALID_PARMS;
-               goto done;
-       }
-
-       dprintf("%s\n", op_name(req->op));
-
-       switch (sys->status) {
-       case SD_STATUS_SHUTDOWN:
-               rsp->result = SD_RES_SHUTDOWN;
-               goto done;
-       case SD_STATUS_WAIT_FOR_FORMAT:
-               if (!is_force_op(req->op)) {
-                       rsp->result = SD_RES_WAIT_FOR_FORMAT;
-                       goto done;
-               }
-               break;
-       case SD_STATUS_WAIT_FOR_JOIN:
-               if (!is_force_op(req->op)) {
-                       rsp->result = SD_RES_WAIT_FOR_JOIN;
-                       goto done;
-               }
-               break;
-       case SD_STATUS_HALT:
-               if (!is_force_op(req->op)) {
-                       rsp->result = SD_RES_HALT;
-                       goto done;
-               }
-               break;
-       default:
-               break;
-       }
-
-       /*
-        * force operations shouldn't access req->vnodes in their
-        * process_work() and process_main() because they can be
-        * called before we set up current_vnode_info
-        */
-       if (!is_force_op(req->op))
-               req->vnodes = get_vnode_info();
-
-       if (is_peer_op(req->op)) {
-               queue_peer_request(req);
-       } else if (is_gateway_op(req->op)) {
-               hdr->epoch = sys->epoch;
-               queue_gateway_request(req);
-       } else if (is_local_op(req->op)) {
-               hdr->epoch = sys->epoch;
-               queue_local_request(req);
-       } else if (is_cluster_op(req->op)) {
-               hdr->epoch = sys->epoch;
-               queue_cluster_request(req);
-       } else {
-               eprintf("unknown operation %d\n", hdr->opcode);
-               rsp->result = SD_RES_SYSTEM_ERROR;
-               goto done;
-       }
-
-       return;
-done:
-       put_request(req);
-}
-
-static void requeue_request(struct request *req)
-{
-       if (req->vnodes)
-               put_vnode_info(req->vnodes);
-       queue_request(req);
-}
-
-static void clear_client_info(struct client_info *ci);
-
-static struct request *alloc_local_request(void *data, int data_length)
-{
-       struct request *req;
-
-       req = xzalloc(sizeof(struct request));
-       if (data_length) {
-               req->data_length = data_length;
-               req->data = data;
-       }
-
-       req->local = 1;
-
-       INIT_LIST_HEAD(&req->request_list);
-
-       return req;
-}
-
-/*
- * Exec the request locally and synchronously.
- *
- * This function takes advantage of gateway's retry mechanism.
- */
-int exec_local_req(struct sd_req *rq, void *data)
-{
-       struct request *req;
-       eventfd_t value = 1;
-       int ret;
-
-       req = alloc_local_request(data, rq->data_length);
-       req->rq = *rq;
-       req->wait_efd = eventfd(0, 0);
-
-       pthread_mutex_lock(&sys->wait_req_lock);
-       list_add_tail(&req->request_list, &sys->wait_req_queue);
-       pthread_mutex_unlock(&sys->wait_req_lock);
-
-       eventfd_write(sys->req_efd, value);
-
-       ret = eventfd_read(req->wait_efd, &value);
-       if (ret < 0)
-               eprintf("event fd read error %m");
-
-       close(req->wait_efd);
-       ret = req->rp.result;
-       free(req);
-
-       return ret;
-}
-
-static struct request *alloc_request(struct client_info *ci, int data_length)
-{
-       struct request *req;
-
-       req = zalloc(sizeof(struct request));
-       if (!req)
-               return NULL;
-
-       req->ci = ci;
-       ci->refcnt++;
-       if (data_length) {
-               req->data_length = data_length;
-               req->data = valloc(data_length);
-               if (!req->data) {
-                       free(req);
-                       return NULL;
-               }
-       }
-
-       INIT_LIST_HEAD(&req->request_list);
-       uatomic_set(&req->refcnt, 1);
-
-       uatomic_inc(&sys->nr_outstanding_reqs);
-
-       return req;
-}
-
-static void free_request(struct request *req)
-{
-       uatomic_dec(&sys->nr_outstanding_reqs);
-
-       req->ci->refcnt--;
-       put_vnode_info(req->vnodes);
-       free(req->data);
-       free(req);
-}
-
-void put_request(struct request *req)
-{
-       struct client_info *ci = req->ci;
-       eventfd_t value = 1;
-
-       if (uatomic_sub_return(&req->refcnt, 1) > 0)
-               return;
-
-       if (req->local) {
-               req->done = 1;
-               eventfd_write(req->wait_efd, value);
-       } else {
-               if (conn_tx_on(&ci->conn)) {
-                       free_request(req);
-                       clear_client_info(ci);
-               } else {
-                       list_add(&req->request_list, &ci->done_reqs);
-               }
-       }
-}
-
-static void init_rx_hdr(struct client_info *ci)
-{
-       ci->conn.c_rx_state = C_IO_HEADER;
-       ci->rx_req = NULL;
-       ci->conn.rx_length = sizeof(struct sd_req);
-       ci->conn.rx_buf = &ci->conn.rx_hdr;
-}
-
-static inline int begin_rx(struct client_info *ci)
-{
-       int ret;
-       uint64_t data_len;
-       struct connection *conn = &ci->conn;
-       struct sd_req *hdr = &conn->rx_hdr;
-       struct request *req;
-
-       switch (conn->c_rx_state) {
-       case C_IO_HEADER:
-               ret = rx(conn, C_IO_DATA_INIT);
-               if (!ret || conn->c_rx_state != C_IO_DATA_INIT)
-                       break;
-       case C_IO_DATA_INIT:
-               data_len = hdr->data_length;
-
-               req = alloc_request(ci, data_len);
-               if (!req) {
-                       conn->c_rx_state = C_IO_CLOSED;
-                       break;
-               }
-               ci->rx_req = req;
-
-               /* use le_to_cpu */
-               memcpy(&req->rq, hdr, sizeof(req->rq));
-
-               if (data_len && hdr->flags & SD_FLAG_CMD_WRITE) {
-                       conn->c_rx_state = C_IO_DATA;
-                       conn->rx_length = data_len;
-                       conn->rx_buf = req->data;
-               } else {
-                       conn->c_rx_state = C_IO_END;
-                       break;
-               }
-       case C_IO_DATA:
-               ret = rx(conn, C_IO_END);
-               break;
-       default:
-               eprintf("bug: unknown state %d\n", conn->c_rx_state);
-       }
-
-       if (is_conn_dead(conn)) {
-               clear_client_info(ci);
-               return -1;
-       }
-
-       /* Short read happens */
-       if (conn->c_rx_state != C_IO_END)
-               return -1;
-
-       return 0;
-}
-
-static inline void finish_rx(struct client_info *ci)
-{
-       struct request *req;
-       struct sd_req *hdr = &ci->conn.rx_hdr;
-
-       req = ci->rx_req;
-       init_rx_hdr(ci);
-       if (hdr->flags & SD_FLAG_CMD_WRITE)
-               req->rp.data_length = 0;
-       else
-               req->rp.data_length = hdr->data_length;
-
-       dprintf("%d, %s:%d\n", ci->conn.fd, ci->conn.ipstr, ci->conn.port);
-       queue_request(req);
-}
-
-static void do_client_rx(struct client_info *ci)
-{
-       if (begin_rx(ci) < 0)
-               return;
-
-       finish_rx(ci);
-}
-
-static void init_tx_hdr(struct client_info *ci)
-{
-       struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-       struct request *req;
-
-       assert(!list_empty(&ci->done_reqs));
-
-       memset(rsp, 0, sizeof(*rsp));
-
-       req = list_first_entry(&ci->done_reqs, struct request, request_list);
-       list_del(&req->request_list);
-
-       ci->tx_req = req;
-       ci->conn.tx_length = sizeof(*rsp);
-       ci->conn.c_tx_state = C_IO_HEADER;
-       ci->conn.tx_buf = rsp;
-
-       /* use cpu_to_le */
-       memcpy(rsp, &req->rp, sizeof(*rsp));
-
-       rsp->epoch = sys->epoch;
-       rsp->opcode = req->rq.opcode;
-       rsp->id = req->rq.id;
-}
-
-static inline int begin_tx(struct client_info *ci)
-{
-       int ret, opt;
-       struct sd_rsp *rsp = (struct sd_rsp *)&ci->conn.tx_hdr;
-
-       /* If short send happens, we don't need init hdr */
-       if (!ci->tx_req)
-               init_tx_hdr(ci);
-
-       opt = 1;
-       setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
-       switch (ci->conn.c_tx_state) {
-       case C_IO_HEADER:
-               ret = tx(&ci->conn, C_IO_DATA_INIT, 0);
-               if (!ret)
-                       break;
-
-               if (rsp->data_length) {
-                       ci->conn.tx_length = rsp->data_length;
-                       ci->conn.tx_buf = ci->tx_req->data;
-                       ci->conn.c_tx_state = C_IO_DATA;
-               } else {
-                       ci->conn.c_tx_state = C_IO_END;
-                       break;
-               }
-       case C_IO_DATA:
-               ret = tx(&ci->conn, C_IO_END, 0);
-               if (!ret)
-                       break;
-       default:
-               break;
-       }
-
-       opt = 0;
-       setsockopt(ci->conn.fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
-
-       if (is_conn_dead(&ci->conn)) {
-               clear_client_info(ci);
-               return -1;
-       }
-       return 0;
-}
-
-/* Return 1 if short send happens or we have more data to send */
-static inline int finish_tx(struct client_info *ci)
-{
-       /* Finish sending one response */
-       if (ci->conn.c_tx_state == C_IO_END) {
-               dprintf("connection from: %d, %s:%d\n", ci->conn.fd,
-                       ci->conn.ipstr, ci->conn.port);
-               free_request(ci->tx_req);
-               ci->tx_req = NULL;
-       }
-       if (ci->tx_req || !list_empty(&ci->done_reqs))
-               return 1;
-       return 0;
-}
-
-static void do_client_tx(struct client_info *ci)
-{
-       if (list_empty(&ci->done_reqs)) {
-               if (conn_tx_off(&ci->conn))
-                       clear_client_info(ci);
-               return;
-       }
-again:
-       if (begin_tx(ci) < 0)
-               return;
-
-       if (finish_tx(ci))
-               goto again;
-
-       /* Let's go sleep, and put_request() will wake me up */
-       if (conn_tx_off(&ci->conn))
-               clear_client_info(ci);
-}
-
-static void destroy_client(struct client_info *ci)
-{
-       dprintf("connection from: %s:%d\n", ci->conn.ipstr, ci->conn.port);
-       close(ci->conn.fd);
-       free(ci);
-}
-
-static void clear_client_info(struct client_info *ci)
-{
-       struct request *req, *t;
-
-       dprintf("connection seems to be dead\n");
-
-       if (ci->rx_req) {
-               free_request(ci->rx_req);
-               ci->rx_req = NULL;
-       }
-
-       if (ci->tx_req) {
-               free_request(ci->tx_req);
-               ci->tx_req = NULL;
-       }
-
-       list_for_each_entry_safe(req, t, &ci->done_reqs, request_list) {
-               list_del(&req->request_list);
-               free_request(req);
-       }
-
-       unregister_event(ci->conn.fd);
-
-       dprintf("refcnt:%d, fd:%d, %s:%d\n",
-               ci->refcnt, ci->conn.fd,
-               ci->conn.ipstr, ci->conn.port);
-
-       if (ci->refcnt)
-               return;
-
-       destroy_client(ci);
-}
-
-static struct client_info *create_client(int fd, struct cluster_info *cluster)
-{
-       struct client_info *ci;
-       struct sockaddr_storage from;
-       socklen_t namesize = sizeof(from);
-
-       ci = zalloc(sizeof(*ci));
-       if (!ci)
-               return NULL;
-
-       if (getpeername(fd, (struct sockaddr *)&from, &namesize))
-               return NULL;
-
-       switch (from.ss_family) {
-       case AF_INET:
-               ci->conn.port = ntohs(((struct sockaddr_in *)&from)->sin_port);
-               inet_ntop(AF_INET, &((struct sockaddr_in *)&from)->sin_addr,
-                               ci->conn.ipstr, sizeof(ci->conn.ipstr));
-               break;
-       case AF_INET6:
-               ci->conn.port = ntohs(((struct sockaddr_in6 
*)&from)->sin6_port);
-               inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&from)->sin6_addr,
-                               ci->conn.ipstr, sizeof(ci->conn.ipstr));
-               break;
-       }
-
-       ci->conn.fd = fd;
-       ci->conn.events = EPOLLIN;
-       ci->refcnt = 0;
-
-       INIT_LIST_HEAD(&ci->done_reqs);
-
-       init_rx_hdr(ci);
-
-       return ci;
-}
-
-static void client_handler(int fd, int events, void *data)
-{
-       struct client_info *ci = (struct client_info *)data;
-
-       dprintf("%x, rx %d, tx %d\n", events, ci->conn.c_rx_state,
-               ci->conn.c_tx_state);
-
-       if (events & (EPOLLERR | EPOLLHUP) || is_conn_dead(&ci->conn))
-               return clear_client_info(ci);
-
-       if (events & EPOLLIN)
-               do_client_rx(ci);
-
-       if (events & EPOLLOUT)
-               do_client_tx(ci);
-}
-
-static void listen_handler(int listen_fd, int events, void *data)
-{
-       struct sockaddr_storage from;
-       socklen_t namesize;
-       int fd, ret;
-       struct client_info *ci;
-
-       if (sys_stat_shutdown()) {
-               dprintf("unregistering connection %d\n", listen_fd);
-               unregister_event(listen_fd);
-               return;
-       }
-
-       namesize = sizeof(from);
-       fd = accept(listen_fd, (struct sockaddr *)&from, &namesize);
-       if (fd < 0) {
-               eprintf("failed to accept a new connection: %m\n");
-               return;
-       }
-
-       ret = set_keepalive(fd);
-       if (ret) {
-               close(fd);
-               return;
-       }
-
-       ret = set_nodelay(fd);
-       if (ret) {
-               close(fd);
-               return;
-       }
-
-       ret = set_nonblocking(fd);
-       if (ret) {
-               close(fd);
-               return;
-       }
-
-       ci = create_client(fd, data);
-       if (!ci) {
-               close(fd);
-               return;
-       }
-
-       ret = register_event(fd, client_handler, ci);
-       if (ret) {
-               destroy_client(ci);
-               return;
-       }
-
-       dprintf("accepted a new connection: %d\n", fd);
-}
-
-static int create_listen_port_fn(int fd, void *data)
-{
-       return register_event(fd, listen_handler, data);
-}
-
-int create_listen_port(int port, void *data)
-{
-       return create_listen_ports(port, create_listen_port_fn, data);
-}
-
-
-static void req_handler(int listen_fd, int events, void *data)
-{
-       eventfd_t value;
-       struct request *req, *t;
-       LIST_HEAD(pending_list);
-       int ret;
-
-       if (events & EPOLLERR)
-               eprintf("request handler error\n");
-
-       ret = eventfd_read(listen_fd, &value);
-       if (ret < 0)
-               return;
-
-       pthread_mutex_lock(&sys->wait_req_lock);
-       list_splice_init(&sys->wait_req_queue, &pending_list);
-       pthread_mutex_unlock(&sys->wait_req_lock);
-
-       list_for_each_entry_safe(req, t, &pending_list, request_list) {
-               list_del(&req->request_list);
-               queue_request(req);
-       }
-}
-
-void local_req_init(void)
-{
-       pthread_mutex_init(&sys->wait_req_lock, NULL);
-       sys->req_efd = eventfd(0, EFD_NONBLOCK);
-       register_event(sys->req_efd, req_handler, NULL);
-}
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to