A good resource for the "proper way" to use libevent is to read the
libevent book. http://www.wangafu.net/~nickm/libevent-book/

On Tue, Oct 6, 2015 at 2:51 AM Azat Khuzhin <[email protected]> wrote:

> On Mon, Oct 05, 2015 at 10:43:19PM +0530, Sanchayan Maity wrote:
> > Hello,
> >
> > I am writing a simple DHT application and using libevent in my
> > application. The libevent version is 2.0 which is installed by
> > default on my Arch Linux setup.
> >
> > After some searching I found an example to use libevent in a
> > multi-threaded setup.
> >
> http://roncemer.com/software-development/multi-threaded-libevent-server-example/#comment-3249
> >
> > The above code runs one event base in its own separate thread and
> > uses a threaded workqueue model to service requests on incoming
> > connections.
>
> Hi,
>
> Personally I prefer to use pipe/socketpair for scheduling jobs to
> another threads, in a nutshell it works like this:
> - acceptcb:
>   write(notify_fd_write_side, user-struct, sizeof(user-struct))
> - per thread event base readcb for notify_fd read side:
>   read(notify_fd_read_size, buf, sizeof(user-struct))
>
> It is much simpler then using pthread_cond_*, but both of them must
> works at first glance.
>
> Here is a few implementations using this mechanism:
> -
> https://github.com/ellzey/libevhtp/blob/libevhtp2/src/evhtp2/evhtp_thr.c#L60
> -
> https://github.com/azat/boostcache/blob/libevent-aio-v2/src/kernel/net/commandserver.cpp#L206
>
> Other comments are inlined in.
>
> > I have taken that sample code and modified it for my own purposes.
> > While my application is functional when I deploy eight of them in
> > my setup and then check for the basic functionality, while trying
> > to run multiple operations from multiple application nodes I get
> > some messages which I do not fully comprehend at the moment.
> >
> > They are as follows:
> > 1. Epoll %s(%d) on fd %d failed.  Old events were %d; read change was %d
> (%s); write change was %d (%s); close change was %d (%s)
> >
> > Which seems to be printed from epoll.c by event warn mechanism
>
> Yep.
>
> >
> > 2. Too many open files
>
> This can be because of too much files (and indeed you have overhead --
> since you have own base for every socket, and it means that you will
> have epollfd for every socket which is not how this must works -- see
> commends to you sample).
>
> And I guess that this is the main problem that you have, other (with
> epoll failed message) will be fixed when you fix EMFILE problem.
>
> But if you still sure that all fds that you have are needful you can
> increase limits:
>
> $ ulimit -n $((1<<20))
> Or permanently in /etc/security/limits.conf.
>
> But that limits are limited with fs.file-max sysctl, so if you need more
> increase it.
>
> > 3. One more which I failed to note and was related to EPOLL MOD
> >
> > I know perhaps this is too much to ask, but can someone have a
> > look at the code and tell me things I am doing wrong. I have gone
> > through the libevent documentation online but I am not sure if I
> > am doing things the right way at the moment.
> >
> > The peer part of dht I try to make synchronous by trying to break
> > the event base loop with event loopbreak call in the read cb however
> > sometimes if the callback never gets called which happens, the peer
> > part of the DHT is just stuck, so to circumvent this I use a timeout.
> >
> > The code is attached. I have tried to keep it clean. I am no networking
> > or socket expert, appreciate any comments or feedback.
> >
> > Thanks & Regards,
> > Sanchayan Maity.
> >
> >
> >
>
> > /*
> >  * Copyright (C) 2015 Sanchayan Maity <[email protected]>
> >  *
> >  * Author: Sanchayan Maity <[email protected]>
> >  *                                               <[email protected]>
> >  *
> >  * This program is free software; you can redistribute it and/or modify
> >  * it under the terms of the GNU General Public License version 2 and
> >  * only version 2 as published by the Free Software Foundation.
> >  *
> >  * This program is distributed in the hope that it will be useful,
> >  * but WITHOUT ANY WARRANTY; without even the implied warranty of
> >  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> >  * GNU General Public License for more details.
> >  */
> > #include "common.h"
> >
> > /*
> >  * Struct to carry around connection (client)-specific data.
> >  */
> > typedef struct client {
> >     /* The client's socket. */
> >     int fd;
> >
> >     /* The event_base for this client. */
> >     struct event_base *evbase;
> >
> >     /* The bufferedevent for this client. */
> >     struct bufferevent *buf_ev;
> >
> >     /* The output buffer for this client. */
> >     struct evbuffer *output_buffer;
> >
> >     /* Here you can add your own application-specific attributes which
> >      * are connection-specific. */
> > } client_t;
> >
> > /* Table entry */
> > struct node_t {
> >       /* Key */
> >       char *key;
> >       /* Value of the key */
> >       char *value;
> >       /* Next entry in chain */
> >     struct node_t *next;
> > };
> >
> > static struct node_t *hashtable[SERVER_HASH_TABLE_SIZE];
> >
> > /*
> >  * We use a read write lock to protect against concurrent
> >  * write to the hash table. It is ok to have concurrent
> >  * readers. We do not use a mutex as that will reduce
> >  * reader concurrency to a single thread at a time.
> >  */
> > pthread_rwlock_t ht_lock;
> >
> > static struct event_base *evbase_accept;
> > /* Workqueue for the server */
> > static workqueue_t workqueue;
> >
> > /*
> >  * Id of server. We use this to pick up appropriate
> >  * IP/port parameters from the file.
> >  */
> > static int server_id;
> >
> > struct server_p {
> >       /* IP to which server will bind to */
> >       char *serverip;
> >       /* Port on which server will listen */
> >       char *serverport;
> > };
> >
> > /*
> >  * We use this to store server parameters of the eight
> >  * servers information read from file
> >  */
> > static struct server_p servers[MAX_NO_OF_SERVERS];
> >
> > pthread_t server_thread;
> > static volatile bool perf_test_on = false;
> >
> > /* Signal handler function (defined below). */
> > static void sighandler(int signal);
> >
> > /*
> >  * Struct to carry around server connection specific data
> >  */
> > struct server_conn {
> >       /* Event base for the peer server connection */
> >       struct event_base *evbase;
> >       /* Buffer event for the peer server connection */
> >       struct bufferevent *bev;
> >       /* Output buffer for the peer server connection */
> >       struct evbuffer *output_buffer;
> > };
> >
> > static struct server_conn sconn;
> >
> > struct timeval peer_timeout;
> >
> > static void closeClient(client_t *client) {
> >     if (client != NULL) {
> >         if (client->fd >= 0) {
> >             close(client->fd);
> >             client->fd = -1;
> >         }
> >     }
> > }
> >
> > static void closeAndFreeClient(client_t *client) {
> >     if (client != NULL) {
> >         closeClient(client);
> >         if (client->buf_ev != NULL) {
> >             bufferevent_free(client->buf_ev);
> >             client->buf_ev = NULL;
> >         }
> >         if (client->evbase != NULL) {
> >             event_base_free(client->evbase);
> >             client->evbase = NULL;
> >         }
> >         if (client->output_buffer != NULL) {
> >             evbuffer_free(client->output_buffer);
> >             client->output_buffer = NULL;
> >         }
> >         free(client);
> >     }
> > }
> >
> > /*
> >  * https://en.wikipedia.org/wiki/Jenkins_hash_function
> >  */
> > unsigned int jenkins_one_at_a_time_hash(const char *key, size_t len) {
> >       unsigned int hash, i;
> >
> >       for(hash = i = 0; i < len; ++i)
> >       {
> >               hash += key[i];
> >               hash += (hash << 10);
> >               hash ^= (hash >> 6);
> >       }
> >
> >       hash += (hash << 3);
> >       hash ^= (hash >> 11);
> >       hash += (hash << 15);
> >
> >       return hash;
> > }
> >
> > /*
> >  * Hash function. Use the above and restrict result as per the table
> >  * size. We use a non power of 2 to get good hashing. Refer CLRS.
> >  */
> > unsigned int hash_server(const char *key) {
> >       return jenkins_one_at_a_time_hash(key, KEY_SIZE) %
> SERVER_HASH_TABLE_SIZE;
> > }
> >
> > /*
> >  * Hash function. Use the above and restrict result as per the table
> >  * size. We use a non power of 2 to get good hashing. Refer CLRS.
> >  */
> > unsigned int hash_peer(const char *key) {
> >       return jenkins_one_at_a_time_hash(key, KEY_SIZE) %
> PEER_HASH_TABLE_SIZE;
> > }
> >
> > /*
> >  * Get the node data pointer as per the key
> >  */
> > struct node_t *hash_table_get(const char *key) {
> >       struct node_t *np;
> >
> >       pthread_rwlock_rdlock(&ht_lock);
> >       for(np = hashtable[hash_server(key)]; np != NULL; np = np->next) {
> >               if (strncmp(key, np->key, KEY_SIZE) == 0) {
> >                       pthread_rwlock_unlock(&ht_lock);
> >                       /* We found the key */
> >                       return np;
> >               }
> >       }
> >
> >       pthread_rwlock_unlock(&ht_lock);
> >       return NULL;
> > }
> >
> > /*
> >  * We determine if the key being added exists. If it does, the
> >  * new value supersedes the old one, else we create a new entry
> >  * and add the key/value pair. Return NULL on any error.
> >  */
> > struct node_t *hash_table_put(const char *key, const char *value) {
> >       unsigned int hashval;
> >       struct node_t *np;
> >
> >       pthread_rwlock_wrlock(&ht_lock);
> >       if ((np = hash_table_get(key)) == NULL) { /* Not found */
> >               np = (struct node_t *)malloc(sizeof(*np));
> >               if (np == NULL || (np->key = strndup(key, KEY_SIZE)) ==
> NULL)
> >                       goto error;
> >
> >               /* Find the bucket position and add at 'head' location */
> >               hashval = hash_server(key);
> >               np->next = hashtable[hashval];
> >               hashtable[hashval] = np;
> >       } else /* Already there */
> >               free((void *) np->value);       /* Free previous value */
> >       if ((np->value = strndup(value, VALUE_SIZE)) == NULL)
> >               goto error;
> >
> >       return np;
> >
> > error:
> >       pthread_rwlock_unlock(&ht_lock);
> >       return NULL;
> > }
> >
> > /*
> >  * Return 0 on success and 1 on failure
> >  */
> > unsigned int hash_table_delete(const char *key) {
> >       struct node_t *np1, *np2;
> >       unsigned int hashval;
> >
> >       hashval = hash_server(key);
> >
> >       pthread_rwlock_wrlock(&ht_lock);
> >       for (np1 = hashtable[hashval], np2 = NULL; np1 != NULL; np2 = np1,
> np1 = np1->next)
> >               if (strncmp(key, np1->key, KEY_SIZE) == 0) {
> >                       /* Found a match */
> >                       free(np1->key);
> >                       free(np1->value);
> >                       if (np2 == NULL)
> >                               /* At the beginning? */
> >                               hashtable[hashval] = np1->next;
> >                       else
> >                               /* In the middle or at the end? */
> >                               np2->next = np1->next;
> >               free(np1);
> >
> >               pthread_rwlock_unlock(&ht_lock);
> >               return 0;
> >       }
> >
> >       pthread_rwlock_unlock(&ht_lock);
> >       return 1;
> > }
> >
> > /*
> >  * Called by libevent when there is data to read.
> >  */
> > void server_buffered_on_read(struct bufferevent *bev, void *arg) {
> >     client_t *client = (client_t *)arg;
> >     char data[MESSAGE_SIZE] = {0};
> >       struct node_t *np;
> >
> >     struct evbuffer *input;
> >     input = bufferevent_get_input(bev);
> >       /*
> >        * Remove a chunk of data from the input buffer, copying it into
> our
> >        * local array (data).
> >        */
> >       evbuffer_remove(input, data, MESSAGE_SIZE);
> >
> >       /*
> >        * Check if the message is meant for us and the command. While
> sending
> >        * data back, we use the same buffer keeping header and key
> information
> >        * but changing/appending the value of the key and setting the OK
> or ERROR command
> >        */
> >       if (data[0] == 'C' && data[1] == 'S') {
> >               switch (data[2]) {
> >               case CMD_PUT:
> >                       if ((hash_table_put(&data[4], &data[24])) == NULL)
> >                               data[3] = CMD_ERR;
> >                       else
> >                               data[3] = CMD_OK;
> >                       evbuffer_add(client->output_buffer, data,
> MESSAGE_SIZE);
> >                       break;
> >               case CMD_GET:
> >                       np = hash_table_get(&data[4]);
> >                       if (np == NULL) {
> >                               data[3] = CMD_ERR;
> >                               evbuffer_add(client->output_buffer, data,
> MESSAGE_SIZE);
> >                       } else {
> >                               data[3] = CMD_OK;
> >                               strncpy(&data[24], np->value, VALUE_SIZE);
> >                               evbuffer_add(client->output_buffer, data,
> MESSAGE_SIZE);
> >                       }
> >                       break;
> >               case CMD_DEL:
> >                       if (hash_table_delete(&data[4]))
> >                               data[3] = CMD_ERR;
> >                       else
> >                               data[3] = CMD_OK;
> >                       evbuffer_add(client->output_buffer, data,
> MESSAGE_SIZE);
> >                       break;
> >               default:
> >                       break;
> >               }
> >       }
> >
> >     /* Send the results to the peer.  This actually only queues the
> results
> >      * for sending. Sending will occur asynchronously, handled by
> libevent. */
> >     if (bufferevent_write_buffer(bev, client->output_buffer)) {
> >         errorOut("Error sending data to client on fd %d\n", client->fd);
> >         closeClient(client);
> >     }
> > }
> >
> > /**
> >  * Called by libevent when the write buffer reaches 0.  We only
> >  * provide this because libevent expects it, but we don't use it.
> >  */
> > void server_buffered_on_write(struct bufferevent *bev, void *arg) {
> > }
> >
> > /**
> >  * Called by libevent when there is an error on underlying the socket
> >  * descriptor.
> >  */
> > void server_buffered_on_error(struct bufferevent *bev, short events,
> void *arg) {
> >       if (events & BEV_EVENT_ERROR) {
> >               printf("Server: Got an error: %s\n",
> >
> evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
> >       }
> >       closeClient((client_t *)arg);
> > }
> >
> > static void server_job_function(struct job *job) {
> >     client_t *client = (client_t *)job->user_data;
> >
> >       event_base_dispatch(client->evbase);
> >       closeAndFreeClient(client);
> >       free(job);
> > }
> >
> > /*
> >  * This function will be called by libevent when there is a connection
> >  * ready to be accepted.
> >  */
> > void server_on_accept(evutil_socket_t fd, short ev, void *arg) {
> >     int client_fd;
> >     struct sockaddr_in client_addr;
> >     socklen_t client_len = sizeof(client_addr);
> >     workqueue_t *workqueue = (workqueue_t *)arg;
> >     client_t *client;
> >     job_t *job;
> >
> >     client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
> >     if (client_fd < 0) {
> >         warn("accept failed");
> >         return;
> >     }
> >
> >     /* Set the client socket to non-blocking mode. */
> >     if (evutil_make_socket_nonblocking(client_fd) < 0) {
> >         warn("failed to set client socket to non-blocking");
> >         close(client_fd);
> >         return;
> >     }
> >
> >     /* Create a client object. */
> >     if ((client = (client_t *)malloc(sizeof(*client))) == NULL) {
> >         warn("failed to allocate memory for client state");
> >         close(client_fd);
> >         return;
> >     }
> >     memset(client, 0, sizeof(*client));
> >     client->fd = client_fd;
> >
> >     if ((client->output_buffer = evbuffer_new()) == NULL) {
> >         warn("client output buffer allocation failed");
> >         closeAndFreeClient(client);
> >         return;
> >     }
> >
> >     if ((client->evbase = event_base_new()) == NULL) {
> >         warn("client event_base creation failed");
> >         closeAndFreeClient(client);
> >         return;
> >     }
>
> Are you sure you need this overhead (i.e. base per socket)?
> AFAICS you don't need this.
>
> >
> >     client->buf_ev = bufferevent_socket_new(client->evbase, client_fd,
> >                                               BEV_OPT_CLOSE_ON_FREE |
> BEV_OPT_DEFER_CALLBACKS);
> >     if ((client->buf_ev) == NULL) {
> >         warn("client bufferevent creation failed");
> >         closeAndFreeClient(client);
> >         return;
> >     }
> >
> >       /*
> >        * We trigger the read callback only when there atleast
> MESSAGE_SIZE
> >        * bytes to be read.
> >        */
> >       bufferevent_setwatermark(client->buf_ev, EV_READ, MESSAGE_SIZE, 0);
> >     bufferevent_setcb(client->buf_ev, server_buffered_on_read,
> server_buffered_on_write,
> >                       server_buffered_on_error, client);
> >
> >     /* We have to enable it before our callbacks will be
> >      * called. */
> >     bufferevent_enable(client->buf_ev, EV_READ);
> >
> >     /* Create a job object and add it to the work queue. */
> >     if ((job = (job_t *)malloc(sizeof(*job))) == NULL) {
> >         warn("failed to allocate memory for job state");
> >         closeAndFreeClient(client);
> >         return;
> >     }
> >     job->job_function = server_job_function;
> >     job->user_data = client;
> >
> >     workqueue_add_job(workqueue, job);
> > }
> >
> > /*
> >  * Run the server.  This function blocks, only returning when the server
> has
> >  * terminated.
> >  */
> > void *runServer(void *arg) {
> >       struct server_p *server_i = (struct server_p *)arg;
> >     evutil_socket_t listenfd;
> >       struct sockaddr_in listen_addr;
> >       struct event *ev_accept;
> >     int reuseaddr_on;
> >       struct sigaction siginfo;
> >
> >     /* Set signal handlers */
> >     sigset_t sigset;
> >     sigemptyset(&sigset);
> >     siginfo.sa_handler = sighandler;
> >     siginfo.sa_mask = sigset;
> >     siginfo.sa_flags = SA_RESTART;
> >
> >     sigaction(SIGINT, &siginfo, NULL);
> >     sigaction(SIGTERM, &siginfo, NULL);
> >
> >       /* Create our listening socket */
> >       listenfd = socket(AF_INET, SOCK_STREAM, 0);
> >       if (listenfd < 0) {
> >               err(1, "listen failed");
> >               goto exit;
> >       }
> >
> >       memset(&listen_addr, 0, sizeof(listen_addr));
> >       listen_addr.sin_family = AF_INET;
> >       listen_addr.sin_port = htons(atoi(server_i->serverport));
> >       inet_pton(AF_INET, server_i->serverip, &listen_addr.sin_addr);
> >
> >     if (bind(listenfd, (struct sockaddr *)&listen_addr,
> sizeof(listen_addr)) < 0) {
> >         err(1, "bind failed");
> >               goto exit;
> >     }
> >
> >     if (listen(listenfd, CONNECTION_BACKLOG) < 0) {
> >         err(1, "listen failed");
> >               goto exit;
> >     }
> >       reuseaddr_on = 1;
> >       setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
> >               sizeof(reuseaddr_on));
> >
> >     /*
> >        * Set the socket to non-blocking, this is essential in event
> >      * based programming with libevent.
> >        */
> >     if (evutil_make_socket_nonblocking(listenfd) < 0) {
> >         err(1, "failed to set server socket to non-blocking");
> >               goto exit;
> >     }
> >
> >     if ((evbase_accept = event_base_new()) == NULL) {
> >         perror("Unable to create socket accept event base");
> >         close(listenfd);
> >         goto exit;
> >     }
> >
> >     /* Initialize work queue. */
> >     if (workqueue_init(&workqueue, NUM_THREADS)) {
> >         perror("Failed to create work queue");
> >         close(listenfd);
> >         workqueue_shutdown(&workqueue);
> >         goto exit;
> >     }
> >
> >     /* We now have a listening socket, we create a read event to
> >      * be notified when a client connects. */
> >     ev_accept = event_new(evbase_accept, listenfd, EV_READ | EV_PERSIST,
> >                                       server_on_accept, (void
> *)&workqueue);
> >     event_add(ev_accept, NULL);
> >
> >     printf("Server running.\n");
> >
> >     /* Start the event loop. */
> >     event_base_dispatch(evbase_accept);
> >
> >     event_base_free(evbase_accept);
> >     evbase_accept = NULL;
> >
> >     close(listenfd);
> >
> >     printf("Server shutdown.\n");
> >
> > exit:
> >       pthread_exit(NULL);
> > }
> >
> > /*
> >  * Kill the server.  This function can be called from another thread to
> kill
> >  * the server, causing runServer() to return.
> >  */
> > void killServer(void) {
> >     fprintf(stdout, "Stopping socket listener event loop.\n");
> >     if (event_base_loopexit(evbase_accept, NULL)) {
> >         perror("Error shutting down server");
> >     }
> >     fprintf(stdout, "Stopping workers.\n");
> >     workqueue_shutdown(&workqueue);
> > }
> >
> > static void sighandler(int signal) {
> >     fprintf(stdout, "Received signal %d: %s.  Shutting down.\n", signal,
> >             strsignal(signal));
> >     killServer();
> > }
> >
> > /*
> >  * Taken from
> http://stackoverflow.com/questions/9210528/split-string-with-delimiters-in-c
> >  * We modify it to use strtok_r the MT safe variant. strtok is not MT
> safe.
> >  */
> > char** str_split(char* a_str, const char a_delim) {
> >       char** result    = 0;
> >       size_t count     = 0;
> >       char* tmp        = a_str;
> >       char* last_comma = 0;
> >       char* save               = 0;
> >       char delim[2];
> >       delim[0] = a_delim;
> >       delim[1] = 0;
> >
> >       /* Count how many elements will be extracted. */
> >       while (*tmp) {
> >               if (a_delim == *tmp) {
> >                       count++;
> >                       last_comma = tmp;
> >               }
> >               tmp++;
> >       }
> >
> >       /* Add space for trailing token. */
> >       count += last_comma < (a_str + strlen(a_str) - 1);
> >
> >       /* Add space for terminating null string so caller
> >       knows where the list of returned strings ends. */
> >       count++;
> >
> >       result = (char **)malloc(sizeof(char*) * count);
> >
> >       if (result) {
> >               size_t idx  = 0;
> >               //char* token = strtok(a_str, delim);
> >               char* token = strtok_r(a_str, delim, &save);
> >
> >               while (token) {
> >                       *(result + idx++) = strdup(token);
> >                       //token = strtok(0, delim);
> >                       token = strtok_r(0, delim, &save);
> >               }
> >               assert(idx == count - 1);
> >               *(result + idx) = 0;
> >       }
> >
> >       return result;
> > }
> >
> > static void print_key(char *data, int start_pos) {
> >       int i;
> >
> >       for (i = start_pos; i < start_pos + KEY_SIZE; i++)
> >               printf("%c", data[i]);
> > }
> >
> > static void closeAndFreeServerConn(struct server_conn *client) {
> >     if (client != NULL) {
> >         if (client->bev != NULL) {
> >             bufferevent_free(client->bev);
> >             client->bev = NULL;
> >         }
> >         if (client->evbase != NULL) {
> >             event_base_free(client->evbase);
> >             client->evbase = NULL;
> >         }
> >         if (client->output_buffer != NULL) {
> >             evbuffer_free(client->output_buffer);
> >             client->output_buffer = NULL;
> >         }
> >     }
> > }
> >
> > /*
> >  * Called by libevent when there is data to read.
> >  */
> > void peer_buffered_on_read(struct bufferevent *bev, void *arg) {
> >       struct server_conn *sconn = (struct server_conn *)arg;
> >     char data[MESSAGE_SIZE] = {0};
> >
> >     struct evbuffer *input;
> >     input = bufferevent_get_input(bev);
> >
> >       /*
> >        * Remove a chunk of data from the input buffer, copying it into
> our
> >        * local array (data).
> >        */
> >       evbuffer_remove(input, data, MESSAGE_SIZE);
> >
> >       /*
> >        * Reply from server
> >        */
> >       if (data[0] == 'C' && data[1] == 'S') {
> >               switch (data[2]) {
> >               case CMD_PUT:
> >                       if (data[3] == CMD_OK) {
> >                               if (!perf_test_on)
> >                                       printf("\nPut operation
> successful\n");
> >                       } else {
> >                               if (!perf_test_on)
> >                                       printf("\nPut operation failed\n");
> >                       }
> >
> >                       if (!perf_test_on) {
> >                               printf("Key was: ");
> >                               print_key(data, 4);
> >                               printf("\n");
> >                               printf("Value was: %s\n\n", &data[24]);
> >                       }
> >                       break;
> >               case CMD_GET:
> >                       if (data[3] == CMD_OK) {
> >                               if (!perf_test_on) {
> >                                       printf("\nGet operation
> successful\n");
> >                                       printf("Key was: ");
> >                                       print_key(data, 4);
> >                                       printf("\n");
> >                                       printf("Value is: %s\n\n",
> &data[24]);
> >                               }
> >                       } else {
> >                               if (!perf_test_on) {
> >                                       printf("\nGet operation failed\n");
> >                                       printf("Key was: ");
> >                                       print_key(data, 4);
> >                                       printf("\n\n");
> >                               }
> >                       }
> >                       break;
> >               case CMD_DEL:
> >                       if (data[3] == CMD_OK) {
> >                               if (!perf_test_on)
> >                                       printf("\nDelete operation
> successful\n");
> >                       }
> >                       else {
> >                               if (!perf_test_on)
> >                                       printf("\nDelete operation
> failed\n");
> >                       }
> >
> >                       if (!perf_test_on) {
> >                               printf("Key was: ");
> >                               print_key(data, 4);
> >                               printf("\n\n");
> >                       }
> >                       break;
> >               default:
> >                       break;
> >               }
> >       }
> >
> >       if (sconn->evbase != NULL)
> >               event_base_loopbreak(sconn->evbase);
> > }
> >
> > /*
> >  * Called by libevent when there is an error on underlying the socket
> >  * descriptor.
> >  */
> > void peer_buffered_on_error(struct bufferevent *bev, short events, void
> *arg) {
> >       if (events & BEV_EVENT_ERROR) {
> >               printf("Peer: Got an error: %s\n",
> >
>  evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
> >       }
> > }
> >
> > /**
> >  * Called by libevent when the write buffer reaches 0.  We only
> >  * provide this because libevent expects it, but we don't use it.
> >  */
> > void peer_buffered_on_write(struct bufferevent *bev, void *arg) {
> > }
> >
> > static int make_server_connection(int lserver_id) {
> >       struct sockaddr_in sin;
> >
> >       memset(&sin, 0, sizeof(sin));
> >       sin.sin_family = AF_INET;
> >       sin.sin_port = htons(atoi(servers[lserver_id].serverport));
> >       inet_pton(AF_INET, servers[lserver_id].serverip, &sin.sin_addr);
> >
> >       if ((sconn.output_buffer = evbuffer_new()) == NULL) {
> >               closeAndFreeServerConn(&sconn);
> >               return -1;
> >       }
> >
> >       if ((sconn.evbase = event_base_new()) == NULL) {
> >               closeAndFreeServerConn(&sconn);
> >               return -1;
> >       }
> >
> >       if ((sconn.bev = bufferevent_socket_new(sconn.evbase,
> >                       -1, BEV_OPT_CLOSE_ON_FREE |
> BEV_OPT_DEFER_CALLBACKS)) == NULL) {
> >               closeAndFreeServerConn(&sconn);
> >               return -1;
> >       }
> >
> >       /*
> >        * We trigger the read callback only when there atleast
> MESSAGE_SIZE
> >        * bytes to be read.
> >        */
> >       bufferevent_setwatermark(sconn.bev, EV_READ | EV_WRITE,
> MESSAGE_SIZE, 0);
> >       bufferevent_setcb(sconn.bev, peer_buffered_on_read,
> >                                         peer_buffered_on_write,
> peer_buffered_on_error, &sconn);
> >       bufferevent_enable(sconn.bev, EV_READ);
> >
> >       if (bufferevent_socket_connect(sconn.bev,
> >                       (struct sockaddr *)&sin, sizeof(sin)) < 0) {
> >               closeAndFreeServerConn(&sconn);
> >               return -1;
> >       }
> >
> >       return 0;
> > }
> >
> > void put_at_server(const char *key, const char *value) {
> >       char data[MESSAGE_SIZE];
> >       int lserver_id;
> >
> >       memset(data, 0x30, MESSAGE_SIZE);
> >       data[0] = 'C';
> >       data[1] = 'S';
> >       data[2] = CMD_PUT;
> >
> >       strncpy(&data[4], key, KEY_SIZE);
> >       strncpy(&data[24], value, VALUE_SIZE);
> >
> >       lserver_id = hash_peer(&data[4]);
> >       if (!perf_test_on)
> >               printf("PUT Server Id: %d\n", lserver_id);
> >
> >       if (make_server_connection(lserver_id) != 0) {
> >               printf("Connecting to Server %d failed.\n", lserver_id);
> >               return;
> >       }
> >
> >       evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
> >       bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
> >       event_base_loopexit(sconn.evbase, &peer_timeout);
> >       event_base_loop(sconn.evbase, 0);
> >
> >       /*
> >        * Though our functional specifications are to maintain the
> connections
> >        * in an open state once we conenct, but for some reason if the
> connections
> >        * are not closed, the buffers do not seem to flush and we never
> exit the
> >        * event loop above OR the server does not exit the event loop
> connection
> >        * even though there was only one event from the event base to
> handle and
> >        * the other end does not get the data or the callbacks get
> called. So we
> >        * explicitly close the connection here. Did try a few different
> mechanisms
> >        * for handling this scenario both at server and peer end but none
> seems to
> >        * work. So we explicitly deviate from our design recommendation
> of keeping
> >        * connections open unfortunately. Needs some further indepth
> investigation.
> >        */
> >       if (sconn.bev != NULL) {
> >               bufferevent_free(sconn.bev);
> >               sconn.bev = NULL;
> >       }
> >       if (sconn.evbase != NULL) {
> >               event_base_free(sconn.evbase);
> >               sconn.evbase = NULL;
> >       }
> >       if (sconn.output_buffer != NULL) {
> >               evbuffer_free(sconn.output_buffer);
> >               sconn.output_buffer = NULL;
> >       }
> > }
> >
> > void get_from_server(const char *key) {
> >       char data[MESSAGE_SIZE];
> >       int lserver_id;
> >
> >       memset(data, 0x30, MESSAGE_SIZE);
> >       data[0] = 'C';
> >       data[1] = 'S';
> >       data[2] = CMD_GET;
> >
> >       strncpy(&data[4], key, KEY_SIZE);
> >
> >       lserver_id = hash_peer(&data[4]);
> >       if (!perf_test_on)
> >               printf("GET Server Id: %d\n", lserver_id);
> >
> >       if (make_server_connection(lserver_id) != 0) {
> >               printf("Connecting to Server %d failed.\n", lserver_id);
> >               return;
> >       }
> >
> >       evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
> >       bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
> >       event_base_loopexit(sconn.evbase, &peer_timeout);
> >       event_base_loop(sconn.evbase, 0);
> >
> >       /*
> >        * Though our functional specifications are to maintain the
> connections
> >        * in an open state once we conenct, but for some reason if the
> connections
> >        * are not closed, the buffers do not seem to flush and we never
> exit the
> >        * event loop above OR the server does not exit the event loop
> connection
> >        * even though there was only one event from the event base to
> handle and
> >        * the other end does not get the data or the callbacks get
> called. So we
> >        * explicitly close the connection here. Did try a few different
> mechanisms
> >        * for handling this scenario both at server and peer end but none
> seems to
> >        * work. So we explicitly deviate from our design recommendation
> of keeping
> >        * connections open unfortunately. Needs some further indepth
> investigation.
> >        */
> >       if (sconn.bev != NULL) {
> >               bufferevent_free(sconn.bev);
> >               sconn.bev = NULL;
> >       }
> >       if (sconn.evbase != NULL) {
> >               event_base_free(sconn.evbase);
> >               sconn.evbase = NULL;
> >       }
> >       if (sconn.output_buffer != NULL) {
> >               evbuffer_free(sconn.output_buffer);
> >               sconn.output_buffer = NULL;
> >       }
> > }
> >
> > void delete_from_server(const char *key) {
> >       char data[MESSAGE_SIZE];
> >       int lserver_id;
> >
> >       memset(data, 0x30, MESSAGE_SIZE);
> >       data[0] = 'C';
> >       data[1] = 'S';
> >       data[2] = CMD_DEL;
> >
> >       strncpy(&data[4], key, KEY_SIZE);
> >
> >       lserver_id = hash_peer(&data[4]);
> >       if (!perf_test_on)
> >               printf("DEL Server Id: %d\n", lserver_id);
> >
> >       if (make_server_connection(lserver_id) != 0) {
> >               printf("Connecting to Server %d failed.\n", lserver_id);
> >               return;
> >       }
> >
> >       evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
> >       bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
> >       event_base_loopexit(sconn.evbase, &peer_timeout);
> >       event_base_loop(sconn.evbase, 0);
> >
> >       /*
> >        * Though our functional specifications are to maintain the
> connections
> >        * in an open state once we conenct, but for some reason if the
> connections
> >        * are not closed, the buffers do not seem to flush and we never
> exit the
> >        * event loop above OR the server does not exit the event loop
> connection
> >        * even though there was only one event from the event base to
> handle and
> >        * the other end does not get the data or the callbacks get
> called. So we
> >        * explicitly close the connection here. Did try a few different
> mechanisms
> >        * for handling this scenario both at server and peer end but none
> seems to
> >        * work. So we explicitly deviate from our design recommendation
> of keeping
> >        * connections open unfortunately. Needs some further indepth
> investigation.
> >        */
> >       if (sconn.bev != NULL) {
> >               bufferevent_free(sconn.bev);
> >               sconn.bev = NULL;
> >       }
> >       if (sconn.evbase != NULL) {
> >               event_base_free(sconn.evbase);
> >               sconn.evbase = NULL;
> >       }
> >       if (sconn.output_buffer != NULL) {
> >               evbuffer_free(sconn.output_buffer);
> >               sconn.output_buffer = NULL;
> >       }
> > }
> >
> > static void discard_logs(int severity, const char *msg) {
> >
> > }
> >
> > void set_libevent_logging(void) {
> >       event_enable_debug_logging(EVENT_DBG_ALL);
> >       //event_set_log_callback(discard_logs);
> > }
> >
> > void rand_str(char *dest, size_t length) {
> >       char charset[] = "0123456789"
> >
>  "abcdefghijklmnopqrstuvwxyz"
> >
>  "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
> >
> >       while (length-- > 0) {
> >               size_t index = (double) rand() / RAND_MAX * (sizeof
> charset - 1);
> >               *dest++ = charset[index];
> >       }
> >
> >       *dest = '\0';
> > }
> >
> > void run_perf_tests(void) {
> >       int i;
> >       char key[KEY_SIZE];
> >       char value[VALUE_SIZE];
> >       struct timeval t1, t2;
> >       double elapsedtime, totalelapsedtime;
> >
> >       perf_test_on = true;
> >
> >       /* Run PUT tests */
> >       for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
> >               memset(key, 0x30, KEY_SIZE);
> >               memset(value, 0x30, VALUE_SIZE);
> >
> >               key[0] = server_id + 0x31;
> >               /* Generate a key depending on loop iteration */
> >               snprintf(&key[1], KEY_SIZE - 1, "%d", i);
> >               /* Generate a random string value */
> >               rand_str(value, VALUE_SIZE);
> >               gettimeofday(&t1, NULL);
> >               put_at_server(key, value);
> >               gettimeofday(&t2, NULL);
> >               // compute and print the elapsed time in millisec
> >               elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      //
> sec to ms
> >               elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   // us
> to ms
> >               printf("Elapsed time: %f\n", elapsedtime);
> >               totalelapsedtime += elapsedtime;
> >       }
> >       printf("Average Response time for PUT requests: %f ms\n",
> totalelapsedtime / NO_OF_TEST_ITERATIONS);
> >
> >       printf("Press any key to continue\n");
> >       getchar();
> >
> >       /* Run GET tests */
> >       for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
> >               memset(key, 0x30, KEY_SIZE);
> >
> >               key[0] = server_id + 0x31;
> >               /* Generate a key depending on loop iteration */
> >               snprintf(&key[1], KEY_SIZE - 1, "%d", i);
> >               gettimeofday(&t1, NULL);
> >               get_from_server(key);
> >               gettimeofday(&t2, NULL);
> >               // compute and print the elapsed time in millisec
> >               elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      //
> sec to ms
> >               elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   // us
> to ms
> >               printf("Elapsed time: %f\n", elapsedtime);
> >               totalelapsedtime += elapsedtime;
> >       }
> >       printf("Average Response time for GET requests: %f ms\n",
> totalelapsedtime / NO_OF_TEST_ITERATIONS);
> >
> >       printf("Press any key to continue\n");
> >       getchar();
> >
> >       /* Run DEL tests */
> >       for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
> >               memset(key, 0x30, KEY_SIZE);
> >
> >               key[0] = server_id + 0x31;
> >               /* Generate a key depending on loop iteration */
> >               snprintf(&key[1], KEY_SIZE - 1, "%d", i);
> >               gettimeofday(&t1, NULL);
> >               delete_from_server(key);
> >               gettimeofday(&t2, NULL);
> >               // compute and print the elapsed time in millisec
> >               elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      //
> sec to ms
> >               elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   // us
> to ms
> >               printf("Elapsed time: %f\n", elapsedtime);
> >               totalelapsedtime += elapsedtime;
> >       }
> >       printf("Average Response time for DEL requests: %f ms\n",
> totalelapsedtime / NO_OF_TEST_ITERATIONS);
> >
> >       perf_test_on = false;
> > }
> >
> > void input_process(void) {
> >       char key[KEY_SIZE];
> >       char value[VALUE_SIZE];
> >       bool exitloop = false;
> >       int input;
> >
> >       /*
> >        * We run the peer functionality in this main thread
> >        */
> >       while (!exitloop) {
> >               printf("\nSelect Operation\n");
> >               printf("(1) Put (2) Get (3) Delete (4) Run tests\n");
> >               printf("(5) Exit");
> >               printf("\nPlease enter your selection (1-5):\t");
> >
> >               scanf("%d", &input);
> >               getchar();
> >
> >               switch (input) {
> >               case 1:
> >                       printf("Enter Key: \t");
> >                       scanf("%s", key);
> >                       printf("\n");
> >                       printf("Enter Value: \t");
> >                       scanf("%s", value);
> >                       printf("\n");
> >                       put_at_server(key, value);
> >                       break;
> >               case 2:
> >                       printf("Enter Key: \t");
> >                       scanf("%s", key);
> >                       printf("\n");
> >                       get_from_server(key);
> >                       break;
> >               case 3:
> >                       printf("Enter Key: \t");
> >                       scanf("%s", key);
> >                       printf("\n");
> >                       delete_from_server(key);
> >                       break;
> >               case 4:
> >                       run_perf_tests();
> >                       break;
> >               case 5:
> >                       killServer();
> >                       exitloop = true;
> >                       break;
> >               default:
> >                       printf("\n\nWrong value: %d\n", input);
> >                       break;
> >               }
> >
> >               /* Reset buffers for next iteration */
> >               memset(key, 0x30, KEY_SIZE);
> >               memset(value, 0x30, VALUE_SIZE);
> >       }
> > }
> >
> > int main(int argc, char *argv[]) {
> >       FILE *fp;
> >       int i;
> >       int error;
> >       int count_of_servers;
> >       ssize_t read;
> >       size_t len = 0;
> >       char *line = NULL;
> >       char **tokens = NULL;
> >
> >       if (argc != 3) {
> >               /*
> >                * We do not validate or error check any of the arguments
> >                * Please enter correct arguments
> >                */
> >               printf("Usage: ./server <serverid#>
> </path/to/server/conf/file>");
> >               exit(1);
> >       }
> >
> >       server_id = atoi(argv[1]) - 1;
> >       if ((server_id < 0) || (server_id > MAX_NO_OF_SERVERS)) {
> >               printf("Incorrect server id provided\n");
> >               exit(1);
> >       }
> >
> >       fp = fopen(argv[2], "r");
> >       if (fp == NULL) {
> >               perror("Could not open server configuration file");
> >               exit(1);
> >       }
> >
> >       /*
> >        * We now extract the IP and port information of 8 servers
> >        * which will be involved in this setup.
> >        */
> >       count_of_servers = 0;
> >       while ((read = getline(&line, &len, fp)) != -1) {
> >
> >               if (count_of_servers == MAX_NO_OF_SERVERS)
> >                       break;
> >
> >               tokens = str_split(line, ' ');
> >               if (tokens) {
> >                       servers[count_of_servers].serverip = *(tokens);
> >                       servers[count_of_servers].serverport = *(tokens +
> 1);
> >               }
> >               free(line);
> >               line = NULL;
> >
> >               count_of_servers++;
> >       }
> >
> >       fclose(fp);
> >
> >       if (pthread_rwlock_init(&ht_lock, NULL) != 0) {
> >               perror("Lock init failed");
> >               goto free_tokens;
> >       }
> >
> >       /*
> >        * Start the server. We start the server in another thread so
> >        * the libevent event loop for dispatching events on connections
> >        * can work separately which would otherwise block.
> >        */
> >       error = pthread_create(&server_thread, NULL, &runServer,
> &servers[server_id]);
> >       if (error != 0) {
> >               perror("Error in server thread creation");
> >               goto free_tokens;
> >       }
> >
> >       peer_timeout.tv_sec = 0;
> >       peer_timeout.tv_usec = PEER_CONN_TIMEOUT;
> >
> >       input_process();
> >
> >       for (i = count_of_servers - 1; i >= 0; --i) {
> >               free(servers[i].serverip);
> >               free(servers[i].serverport);
> >       }
> >
> >       pthread_rwlock_destroy(&ht_lock);
> >
> >       return 0;
> >
> > free_tokens:
> >       for (i = count_of_servers - 1; i >= 0; --i) {
> >               free(servers[i].serverip);
> >               free(servers[i].serverport);
> >       }
> >
> >       return -1;
> > }
> >
>
>
> --
> Respectfully
> Azat Khuzhin
> ***********************************************************************
> To unsubscribe, send an e-mail to [email protected] with
> unsubscribe libevent-users    in the body.
>

Reply via email to