also DHT, a real DHT, is never simple; but I figure you are talking about a
simple consistent hash?

On Tue, Oct 6, 2015 at 2:42 PM Mark Ellzey <[email protected]> wrote:

> A good resource for the "proper way" to use libevent is to read the
> libevent book. http://www.wangafu.net/~nickm/libevent-book/
>
> On Tue, Oct 6, 2015 at 2:51 AM Azat Khuzhin <[email protected]> wrote:
>
>> On Mon, Oct 05, 2015 at 10:43:19PM +0530, Sanchayan Maity wrote:
>> > Hello,
>> >
>> > I am writing a simple DHT application and using libevent in my
>> > application. The libevent version is 2.0 which is installed by
>> > default on my Arch Linux setup.
>> >
>> > After some searching I found an example to use libevent in a
>> > multi-threaded setup.
>> >
>> http://roncemer.com/software-development/multi-threaded-libevent-server-example/#comment-3249
>> >
>> > The above code runs one event base in its own separate thread and
>> > uses a threaded workqueue model to service requests on incoming
>> > connections.
>>
>> Hi,
>>
>> Personally I prefer to use pipe/socketpair for scheduling jobs to
>> another threads, in a nutshell it works like this:
>> - acceptcb:
>>   write(notify_fd_write_side, user-struct, sizeof(user-struct))
>> - per thread event base readcb for notify_fd read side:
>>   read(notify_fd_read_size, buf, sizeof(user-struct))
>>
>> It is much simpler then using pthread_cond_*, but both of them must
>> works at first glance.
>>
>> Here is a few implementations using this mechanism:
>> -
>> https://github.com/ellzey/libevhtp/blob/libevhtp2/src/evhtp2/evhtp_thr.c#L60
>> -
>> https://github.com/azat/boostcache/blob/libevent-aio-v2/src/kernel/net/commandserver.cpp#L206
>>
>> Other comments are inlined in.
>>
>> > I have taken that sample code and modified it for my own purposes.
>> > While my application is functional when I deploy eight of them in
>> > my setup and then check for the basic functionality, while trying
>> > to run multiple operations from multiple application nodes I get
>> > some messages which I do not fully comprehend at the moment.
>> >
>> > They are as follows:
>> > 1. Epoll %s(%d) on fd %d failed.  Old events were %d; read change was
>> %d (%s); write change was %d (%s); close change was %d (%s)
>> >
>> > Which seems to be printed from epoll.c by event warn mechanism
>>
>> Yep.
>>
>> >
>> > 2. Too many open files
>>
>> This can be because of too much files (and indeed you have overhead --
>> since you have own base for every socket, and it means that you will
>> have epollfd for every socket which is not how this must works -- see
>> commends to you sample).
>>
>> And I guess that this is the main problem that you have, other (with
>> epoll failed message) will be fixed when you fix EMFILE problem.
>>
>> But if you still sure that all fds that you have are needful you can
>> increase limits:
>>
>> $ ulimit -n $((1<<20))
>> Or permanently in /etc/security/limits.conf.
>>
>> But that limits are limited with fs.file-max sysctl, so if you need more
>> increase it.
>>
>> > 3. One more which I failed to note and was related to EPOLL MOD
>> >
>> > I know perhaps this is too much to ask, but can someone have a
>> > look at the code and tell me things I am doing wrong. I have gone
>> > through the libevent documentation online but I am not sure if I
>> > am doing things the right way at the moment.
>> >
>> > The peer part of dht I try to make synchronous by trying to break
>> > the event base loop with event loopbreak call in the read cb however
>> > sometimes if the callback never gets called which happens, the peer
>> > part of the DHT is just stuck, so to circumvent this I use a timeout.
>> >
>> > The code is attached. I have tried to keep it clean. I am no networking
>> > or socket expert, appreciate any comments or feedback.
>> >
>> > Thanks & Regards,
>> > Sanchayan Maity.
>> >
>> >
>> >
>>
>> > /*
>> >  * Copyright (C) 2015 Sanchayan Maity <[email protected]>
>> >  *
>> >  * Author: Sanchayan Maity <[email protected]>
>> >  *                                               <[email protected]>
>> >  *
>> >  * This program is free software; you can redistribute it and/or modify
>> >  * it under the terms of the GNU General Public License version 2 and
>> >  * only version 2 as published by the Free Software Foundation.
>> >  *
>> >  * This program is distributed in the hope that it will be useful,
>> >  * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> >  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>> >  * GNU General Public License for more details.
>> >  */
>> > #include "common.h"
>> >
>> > /*
>> >  * Struct to carry around connection (client)-specific data.
>> >  */
>> > typedef struct client {
>> >     /* The client's socket. */
>> >     int fd;
>> >
>> >     /* The event_base for this client. */
>> >     struct event_base *evbase;
>> >
>> >     /* The bufferedevent for this client. */
>> >     struct bufferevent *buf_ev;
>> >
>> >     /* The output buffer for this client. */
>> >     struct evbuffer *output_buffer;
>> >
>> >     /* Here you can add your own application-specific attributes which
>> >      * are connection-specific. */
>> > } client_t;
>> >
>> > /* Table entry */
>> > struct node_t {
>> >       /* Key */
>> >       char *key;
>> >       /* Value of the key */
>> >       char *value;
>> >       /* Next entry in chain */
>> >     struct node_t *next;
>> > };
>> >
>> > static struct node_t *hashtable[SERVER_HASH_TABLE_SIZE];
>> >
>> > /*
>> >  * We use a read write lock to protect against concurrent
>> >  * write to the hash table. It is ok to have concurrent
>> >  * readers. We do not use a mutex as that will reduce
>> >  * reader concurrency to a single thread at a time.
>> >  */
>> > pthread_rwlock_t ht_lock;
>> >
>> > static struct event_base *evbase_accept;
>> > /* Workqueue for the server */
>> > static workqueue_t workqueue;
>> >
>> > /*
>> >  * Id of server. We use this to pick up appropriate
>> >  * IP/port parameters from the file.
>> >  */
>> > static int server_id;
>> >
>> > struct server_p {
>> >       /* IP to which server will bind to */
>> >       char *serverip;
>> >       /* Port on which server will listen */
>> >       char *serverport;
>> > };
>> >
>> > /*
>> >  * We use this to store server parameters of the eight
>> >  * servers information read from file
>> >  */
>> > static struct server_p servers[MAX_NO_OF_SERVERS];
>> >
>> > pthread_t server_thread;
>> > static volatile bool perf_test_on = false;
>> >
>> > /* Signal handler function (defined below). */
>> > static void sighandler(int signal);
>> >
>> > /*
>> >  * Struct to carry around server connection specific data
>> >  */
>> > struct server_conn {
>> >       /* Event base for the peer server connection */
>> >       struct event_base *evbase;
>> >       /* Buffer event for the peer server connection */
>> >       struct bufferevent *bev;
>> >       /* Output buffer for the peer server connection */
>> >       struct evbuffer *output_buffer;
>> > };
>> >
>> > static struct server_conn sconn;
>> >
>> > struct timeval peer_timeout;
>> >
>> > static void closeClient(client_t *client) {
>> >     if (client != NULL) {
>> >         if (client->fd >= 0) {
>> >             close(client->fd);
>> >             client->fd = -1;
>> >         }
>> >     }
>> > }
>> >
>> > static void closeAndFreeClient(client_t *client) {
>> >     if (client != NULL) {
>> >         closeClient(client);
>> >         if (client->buf_ev != NULL) {
>> >             bufferevent_free(client->buf_ev);
>> >             client->buf_ev = NULL;
>> >         }
>> >         if (client->evbase != NULL) {
>> >             event_base_free(client->evbase);
>> >             client->evbase = NULL;
>> >         }
>> >         if (client->output_buffer != NULL) {
>> >             evbuffer_free(client->output_buffer);
>> >             client->output_buffer = NULL;
>> >         }
>> >         free(client);
>> >     }
>> > }
>> >
>> > /*
>> >  * https://en.wikipedia.org/wiki/Jenkins_hash_function
>> >  */
>> > unsigned int jenkins_one_at_a_time_hash(const char *key, size_t len) {
>> >       unsigned int hash, i;
>> >
>> >       for(hash = i = 0; i < len; ++i)
>> >       {
>> >               hash += key[i];
>> >               hash += (hash << 10);
>> >               hash ^= (hash >> 6);
>> >       }
>> >
>> >       hash += (hash << 3);
>> >       hash ^= (hash >> 11);
>> >       hash += (hash << 15);
>> >
>> >       return hash;
>> > }
>> >
>> > /*
>> >  * Hash function. Use the above and restrict result as per the table
>> >  * size. We use a non power of 2 to get good hashing. Refer CLRS.
>> >  */
>> > unsigned int hash_server(const char *key) {
>> >       return jenkins_one_at_a_time_hash(key, KEY_SIZE) %
>> SERVER_HASH_TABLE_SIZE;
>> > }
>> >
>> > /*
>> >  * Hash function. Use the above and restrict result as per the table
>> >  * size. We use a non power of 2 to get good hashing. Refer CLRS.
>> >  */
>> > unsigned int hash_peer(const char *key) {
>> >       return jenkins_one_at_a_time_hash(key, KEY_SIZE) %
>> PEER_HASH_TABLE_SIZE;
>> > }
>> >
>> > /*
>> >  * Get the node data pointer as per the key
>> >  */
>> > struct node_t *hash_table_get(const char *key) {
>> >       struct node_t *np;
>> >
>> >       pthread_rwlock_rdlock(&ht_lock);
>> >       for(np = hashtable[hash_server(key)]; np != NULL; np = np->next) {
>> >               if (strncmp(key, np->key, KEY_SIZE) == 0) {
>> >                       pthread_rwlock_unlock(&ht_lock);
>> >                       /* We found the key */
>> >                       return np;
>> >               }
>> >       }
>> >
>> >       pthread_rwlock_unlock(&ht_lock);
>> >       return NULL;
>> > }
>> >
>> > /*
>> >  * We determine if the key being added exists. If it does, the
>> >  * new value supersedes the old one, else we create a new entry
>> >  * and add the key/value pair. Return NULL on any error.
>> >  */
>> > struct node_t *hash_table_put(const char *key, const char *value) {
>> >       unsigned int hashval;
>> >       struct node_t *np;
>> >
>> >       pthread_rwlock_wrlock(&ht_lock);
>> >       if ((np = hash_table_get(key)) == NULL) { /* Not found */
>> >               np = (struct node_t *)malloc(sizeof(*np));
>> >               if (np == NULL || (np->key = strndup(key, KEY_SIZE)) ==
>> NULL)
>> >                       goto error;
>> >
>> >               /* Find the bucket position and add at 'head' location */
>> >               hashval = hash_server(key);
>> >               np->next = hashtable[hashval];
>> >               hashtable[hashval] = np;
>> >       } else /* Already there */
>> >               free((void *) np->value);       /* Free previous value */
>> >       if ((np->value = strndup(value, VALUE_SIZE)) == NULL)
>> >               goto error;
>> >
>> >       return np;
>> >
>> > error:
>> >       pthread_rwlock_unlock(&ht_lock);
>> >       return NULL;
>> > }
>> >
>> > /*
>> >  * Return 0 on success and 1 on failure
>> >  */
>> > unsigned int hash_table_delete(const char *key) {
>> >       struct node_t *np1, *np2;
>> >       unsigned int hashval;
>> >
>> >       hashval = hash_server(key);
>> >
>> >       pthread_rwlock_wrlock(&ht_lock);
>> >       for (np1 = hashtable[hashval], np2 = NULL; np1 != NULL; np2 =
>> np1, np1 = np1->next)
>> >               if (strncmp(key, np1->key, KEY_SIZE) == 0) {
>> >                       /* Found a match */
>> >                       free(np1->key);
>> >                       free(np1->value);
>> >                       if (np2 == NULL)
>> >                               /* At the beginning? */
>> >                               hashtable[hashval] = np1->next;
>> >                       else
>> >                               /* In the middle or at the end? */
>> >                               np2->next = np1->next;
>> >               free(np1);
>> >
>> >               pthread_rwlock_unlock(&ht_lock);
>> >               return 0;
>> >       }
>> >
>> >       pthread_rwlock_unlock(&ht_lock);
>> >       return 1;
>> > }
>> >
>> > /*
>> >  * Called by libevent when there is data to read.
>> >  */
>> > void server_buffered_on_read(struct bufferevent *bev, void *arg) {
>> >     client_t *client = (client_t *)arg;
>> >     char data[MESSAGE_SIZE] = {0};
>> >       struct node_t *np;
>> >
>> >     struct evbuffer *input;
>> >     input = bufferevent_get_input(bev);
>> >       /*
>> >        * Remove a chunk of data from the input buffer, copying it into
>> our
>> >        * local array (data).
>> >        */
>> >       evbuffer_remove(input, data, MESSAGE_SIZE);
>> >
>> >       /*
>> >        * Check if the message is meant for us and the command. While
>> sending
>> >        * data back, we use the same buffer keeping header and key
>> information
>> >        * but changing/appending the value of the key and setting the OK
>> or ERROR command
>> >        */
>> >       if (data[0] == 'C' && data[1] == 'S') {
>> >               switch (data[2]) {
>> >               case CMD_PUT:
>> >                       if ((hash_table_put(&data[4], &data[24])) == NULL)
>> >                               data[3] = CMD_ERR;
>> >                       else
>> >                               data[3] = CMD_OK;
>> >                       evbuffer_add(client->output_buffer, data,
>> MESSAGE_SIZE);
>> >                       break;
>> >               case CMD_GET:
>> >                       np = hash_table_get(&data[4]);
>> >                       if (np == NULL) {
>> >                               data[3] = CMD_ERR;
>> >                               evbuffer_add(client->output_buffer, data,
>> MESSAGE_SIZE);
>> >                       } else {
>> >                               data[3] = CMD_OK;
>> >                               strncpy(&data[24], np->value, VALUE_SIZE);
>> >                               evbuffer_add(client->output_buffer, data,
>> MESSAGE_SIZE);
>> >                       }
>> >                       break;
>> >               case CMD_DEL:
>> >                       if (hash_table_delete(&data[4]))
>> >                               data[3] = CMD_ERR;
>> >                       else
>> >                               data[3] = CMD_OK;
>> >                       evbuffer_add(client->output_buffer, data,
>> MESSAGE_SIZE);
>> >                       break;
>> >               default:
>> >                       break;
>> >               }
>> >       }
>> >
>> >     /* Send the results to the peer.  This actually only queues the
>> results
>> >      * for sending. Sending will occur asynchronously, handled by
>> libevent. */
>> >     if (bufferevent_write_buffer(bev, client->output_buffer)) {
>> >         errorOut("Error sending data to client on fd %d\n", client->fd);
>> >         closeClient(client);
>> >     }
>> > }
>> >
>> > /**
>> >  * Called by libevent when the write buffer reaches 0.  We only
>> >  * provide this because libevent expects it, but we don't use it.
>> >  */
>> > void server_buffered_on_write(struct bufferevent *bev, void *arg) {
>> > }
>> >
>> > /**
>> >  * Called by libevent when there is an error on underlying the socket
>> >  * descriptor.
>> >  */
>> > void server_buffered_on_error(struct bufferevent *bev, short events,
>> void *arg) {
>> >       if (events & BEV_EVENT_ERROR) {
>> >               printf("Server: Got an error: %s\n",
>> >
>> evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
>> >       }
>> >       closeClient((client_t *)arg);
>> > }
>> >
>> > static void server_job_function(struct job *job) {
>> >     client_t *client = (client_t *)job->user_data;
>> >
>> >       event_base_dispatch(client->evbase);
>> >       closeAndFreeClient(client);
>> >       free(job);
>> > }
>> >
>> > /*
>> >  * This function will be called by libevent when there is a connection
>> >  * ready to be accepted.
>> >  */
>> > void server_on_accept(evutil_socket_t fd, short ev, void *arg) {
>> >     int client_fd;
>> >     struct sockaddr_in client_addr;
>> >     socklen_t client_len = sizeof(client_addr);
>> >     workqueue_t *workqueue = (workqueue_t *)arg;
>> >     client_t *client;
>> >     job_t *job;
>> >
>> >     client_fd = accept(fd, (struct sockaddr *)&client_addr,
>> &client_len);
>> >     if (client_fd < 0) {
>> >         warn("accept failed");
>> >         return;
>> >     }
>> >
>> >     /* Set the client socket to non-blocking mode. */
>> >     if (evutil_make_socket_nonblocking(client_fd) < 0) {
>> >         warn("failed to set client socket to non-blocking");
>> >         close(client_fd);
>> >         return;
>> >     }
>> >
>> >     /* Create a client object. */
>> >     if ((client = (client_t *)malloc(sizeof(*client))) == NULL) {
>> >         warn("failed to allocate memory for client state");
>> >         close(client_fd);
>> >         return;
>> >     }
>> >     memset(client, 0, sizeof(*client));
>> >     client->fd = client_fd;
>> >
>> >     if ((client->output_buffer = evbuffer_new()) == NULL) {
>> >         warn("client output buffer allocation failed");
>> >         closeAndFreeClient(client);
>> >         return;
>> >     }
>> >
>> >     if ((client->evbase = event_base_new()) == NULL) {
>> >         warn("client event_base creation failed");
>> >         closeAndFreeClient(client);
>> >         return;
>> >     }
>>
>> Are you sure you need this overhead (i.e. base per socket)?
>> AFAICS you don't need this.
>>
>> >
>> >     client->buf_ev = bufferevent_socket_new(client->evbase, client_fd,
>> >                                               BEV_OPT_CLOSE_ON_FREE |
>> BEV_OPT_DEFER_CALLBACKS);
>> >     if ((client->buf_ev) == NULL) {
>> >         warn("client bufferevent creation failed");
>> >         closeAndFreeClient(client);
>> >         return;
>> >     }
>> >
>> >       /*
>> >        * We trigger the read callback only when there atleast
>> MESSAGE_SIZE
>> >        * bytes to be read.
>> >        */
>> >       bufferevent_setwatermark(client->buf_ev, EV_READ, MESSAGE_SIZE,
>> 0);
>> >     bufferevent_setcb(client->buf_ev, server_buffered_on_read,
>> server_buffered_on_write,
>> >                       server_buffered_on_error, client);
>> >
>> >     /* We have to enable it before our callbacks will be
>> >      * called. */
>> >     bufferevent_enable(client->buf_ev, EV_READ);
>> >
>> >     /* Create a job object and add it to the work queue. */
>> >     if ((job = (job_t *)malloc(sizeof(*job))) == NULL) {
>> >         warn("failed to allocate memory for job state");
>> >         closeAndFreeClient(client);
>> >         return;
>> >     }
>> >     job->job_function = server_job_function;
>> >     job->user_data = client;
>> >
>> >     workqueue_add_job(workqueue, job);
>> > }
>> >
>> > /*
>> >  * Run the server.  This function blocks, only returning when the
>> server has
>> >  * terminated.
>> >  */
>> > void *runServer(void *arg) {
>> >       struct server_p *server_i = (struct server_p *)arg;
>> >     evutil_socket_t listenfd;
>> >       struct sockaddr_in listen_addr;
>> >       struct event *ev_accept;
>> >     int reuseaddr_on;
>> >       struct sigaction siginfo;
>> >
>> >     /* Set signal handlers */
>> >     sigset_t sigset;
>> >     sigemptyset(&sigset);
>> >     siginfo.sa_handler = sighandler;
>> >     siginfo.sa_mask = sigset;
>> >     siginfo.sa_flags = SA_RESTART;
>> >
>> >     sigaction(SIGINT, &siginfo, NULL);
>> >     sigaction(SIGTERM, &siginfo, NULL);
>> >
>> >       /* Create our listening socket */
>> >       listenfd = socket(AF_INET, SOCK_STREAM, 0);
>> >       if (listenfd < 0) {
>> >               err(1, "listen failed");
>> >               goto exit;
>> >       }
>> >
>> >       memset(&listen_addr, 0, sizeof(listen_addr));
>> >       listen_addr.sin_family = AF_INET;
>> >       listen_addr.sin_port = htons(atoi(server_i->serverport));
>> >       inet_pton(AF_INET, server_i->serverip, &listen_addr.sin_addr);
>> >
>> >     if (bind(listenfd, (struct sockaddr *)&listen_addr,
>> sizeof(listen_addr)) < 0) {
>> >         err(1, "bind failed");
>> >               goto exit;
>> >     }
>> >
>> >     if (listen(listenfd, CONNECTION_BACKLOG) < 0) {
>> >         err(1, "listen failed");
>> >               goto exit;
>> >     }
>> >       reuseaddr_on = 1;
>> >       setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
>> >               sizeof(reuseaddr_on));
>> >
>> >     /*
>> >        * Set the socket to non-blocking, this is essential in event
>> >      * based programming with libevent.
>> >        */
>> >     if (evutil_make_socket_nonblocking(listenfd) < 0) {
>> >         err(1, "failed to set server socket to non-blocking");
>> >               goto exit;
>> >     }
>> >
>> >     if ((evbase_accept = event_base_new()) == NULL) {
>> >         perror("Unable to create socket accept event base");
>> >         close(listenfd);
>> >         goto exit;
>> >     }
>> >
>> >     /* Initialize work queue. */
>> >     if (workqueue_init(&workqueue, NUM_THREADS)) {
>> >         perror("Failed to create work queue");
>> >         close(listenfd);
>> >         workqueue_shutdown(&workqueue);
>> >         goto exit;
>> >     }
>> >
>> >     /* We now have a listening socket, we create a read event to
>> >      * be notified when a client connects. */
>> >     ev_accept = event_new(evbase_accept, listenfd, EV_READ | EV_PERSIST,
>> >                                       server_on_accept, (void
>> *)&workqueue);
>> >     event_add(ev_accept, NULL);
>> >
>> >     printf("Server running.\n");
>> >
>> >     /* Start the event loop. */
>> >     event_base_dispatch(evbase_accept);
>> >
>> >     event_base_free(evbase_accept);
>> >     evbase_accept = NULL;
>> >
>> >     close(listenfd);
>> >
>> >     printf("Server shutdown.\n");
>> >
>> > exit:
>> >       pthread_exit(NULL);
>> > }
>> >
>> > /*
>> >  * Kill the server.  This function can be called from another thread to
>> kill
>> >  * the server, causing runServer() to return.
>> >  */
>> > void killServer(void) {
>> >     fprintf(stdout, "Stopping socket listener event loop.\n");
>> >     if (event_base_loopexit(evbase_accept, NULL)) {
>> >         perror("Error shutting down server");
>> >     }
>> >     fprintf(stdout, "Stopping workers.\n");
>> >     workqueue_shutdown(&workqueue);
>> > }
>> >
>> > static void sighandler(int signal) {
>> >     fprintf(stdout, "Received signal %d: %s.  Shutting down.\n", signal,
>> >             strsignal(signal));
>> >     killServer();
>> > }
>> >
>> > /*
>> >  * Taken from
>> http://stackoverflow.com/questions/9210528/split-string-with-delimiters-in-c
>> >  * We modify it to use strtok_r the MT safe variant. strtok is not MT
>> safe.
>> >  */
>> > char** str_split(char* a_str, const char a_delim) {
>> >       char** result    = 0;
>> >       size_t count     = 0;
>> >       char* tmp        = a_str;
>> >       char* last_comma = 0;
>> >       char* save               = 0;
>> >       char delim[2];
>> >       delim[0] = a_delim;
>> >       delim[1] = 0;
>> >
>> >       /* Count how many elements will be extracted. */
>> >       while (*tmp) {
>> >               if (a_delim == *tmp) {
>> >                       count++;
>> >                       last_comma = tmp;
>> >               }
>> >               tmp++;
>> >       }
>> >
>> >       /* Add space for trailing token. */
>> >       count += last_comma < (a_str + strlen(a_str) - 1);
>> >
>> >       /* Add space for terminating null string so caller
>> >       knows where the list of returned strings ends. */
>> >       count++;
>> >
>> >       result = (char **)malloc(sizeof(char*) * count);
>> >
>> >       if (result) {
>> >               size_t idx  = 0;
>> >               //char* token = strtok(a_str, delim);
>> >               char* token = strtok_r(a_str, delim, &save);
>> >
>> >               while (token) {
>> >                       *(result + idx++) = strdup(token);
>> >                       //token = strtok(0, delim);
>> >                       token = strtok_r(0, delim, &save);
>> >               }
>> >               assert(idx == count - 1);
>> >               *(result + idx) = 0;
>> >       }
>> >
>> >       return result;
>> > }
>> >
>> > static void print_key(char *data, int start_pos) {
>> >       int i;
>> >
>> >       for (i = start_pos; i < start_pos + KEY_SIZE; i++)
>> >               printf("%c", data[i]);
>> > }
>> >
>> > static void closeAndFreeServerConn(struct server_conn *client) {
>> >     if (client != NULL) {
>> >         if (client->bev != NULL) {
>> >             bufferevent_free(client->bev);
>> >             client->bev = NULL;
>> >         }
>> >         if (client->evbase != NULL) {
>> >             event_base_free(client->evbase);
>> >             client->evbase = NULL;
>> >         }
>> >         if (client->output_buffer != NULL) {
>> >             evbuffer_free(client->output_buffer);
>> >             client->output_buffer = NULL;
>> >         }
>> >     }
>> > }
>> >
>> > /*
>> >  * Called by libevent when there is data to read.
>> >  */
>> > void peer_buffered_on_read(struct bufferevent *bev, void *arg) {
>> >       struct server_conn *sconn = (struct server_conn *)arg;
>> >     char data[MESSAGE_SIZE] = {0};
>> >
>> >     struct evbuffer *input;
>> >     input = bufferevent_get_input(bev);
>> >
>> >       /*
>> >        * Remove a chunk of data from the input buffer, copying it into
>> our
>> >        * local array (data).
>> >        */
>> >       evbuffer_remove(input, data, MESSAGE_SIZE);
>> >
>> >       /*
>> >        * Reply from server
>> >        */
>> >       if (data[0] == 'C' && data[1] == 'S') {
>> >               switch (data[2]) {
>> >               case CMD_PUT:
>> >                       if (data[3] == CMD_OK) {
>> >                               if (!perf_test_on)
>> >                                       printf("\nPut operation
>> successful\n");
>> >                       } else {
>> >                               if (!perf_test_on)
>> >                                       printf("\nPut operation
>> failed\n");
>> >                       }
>> >
>> >                       if (!perf_test_on) {
>> >                               printf("Key was: ");
>> >                               print_key(data, 4);
>> >                               printf("\n");
>> >                               printf("Value was: %s\n\n", &data[24]);
>> >                       }
>> >                       break;
>> >               case CMD_GET:
>> >                       if (data[3] == CMD_OK) {
>> >                               if (!perf_test_on) {
>> >                                       printf("\nGet operation
>> successful\n");
>> >                                       printf("Key was: ");
>> >                                       print_key(data, 4);
>> >                                       printf("\n");
>> >                                       printf("Value is: %s\n\n",
>> &data[24]);
>> >                               }
>> >                       } else {
>> >                               if (!perf_test_on) {
>> >                                       printf("\nGet operation
>> failed\n");
>> >                                       printf("Key was: ");
>> >                                       print_key(data, 4);
>> >                                       printf("\n\n");
>> >                               }
>> >                       }
>> >                       break;
>> >               case CMD_DEL:
>> >                       if (data[3] == CMD_OK) {
>> >                               if (!perf_test_on)
>> >                                       printf("\nDelete operation
>> successful\n");
>> >                       }
>> >                       else {
>> >                               if (!perf_test_on)
>> >                                       printf("\nDelete operation
>> failed\n");
>> >                       }
>> >
>> >                       if (!perf_test_on) {
>> >                               printf("Key was: ");
>> >                               print_key(data, 4);
>> >                               printf("\n\n");
>> >                       }
>> >                       break;
>> >               default:
>> >                       break;
>> >               }
>> >       }
>> >
>> >       if (sconn->evbase != NULL)
>> >               event_base_loopbreak(sconn->evbase);
>> > }
>> >
>> > /*
>> >  * Called by libevent when there is an error on underlying the socket
>> >  * descriptor.
>> >  */
>> > void peer_buffered_on_error(struct bufferevent *bev, short events, void
>> *arg) {
>> >       if (events & BEV_EVENT_ERROR) {
>> >               printf("Peer: Got an error: %s\n",
>> >
>>  evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
>> >       }
>> > }
>> >
>> > /**
>> >  * Called by libevent when the write buffer reaches 0.  We only
>> >  * provide this because libevent expects it, but we don't use it.
>> >  */
>> > void peer_buffered_on_write(struct bufferevent *bev, void *arg) {
>> > }
>> >
>> > static int make_server_connection(int lserver_id) {
>> >       struct sockaddr_in sin;
>> >
>> >       memset(&sin, 0, sizeof(sin));
>> >       sin.sin_family = AF_INET;
>> >       sin.sin_port = htons(atoi(servers[lserver_id].serverport));
>> >       inet_pton(AF_INET, servers[lserver_id].serverip, &sin.sin_addr);
>> >
>> >       if ((sconn.output_buffer = evbuffer_new()) == NULL) {
>> >               closeAndFreeServerConn(&sconn);
>> >               return -1;
>> >       }
>> >
>> >       if ((sconn.evbase = event_base_new()) == NULL) {
>> >               closeAndFreeServerConn(&sconn);
>> >               return -1;
>> >       }
>> >
>> >       if ((sconn.bev = bufferevent_socket_new(sconn.evbase,
>> >                       -1, BEV_OPT_CLOSE_ON_FREE |
>> BEV_OPT_DEFER_CALLBACKS)) == NULL) {
>> >               closeAndFreeServerConn(&sconn);
>> >               return -1;
>> >       }
>> >
>> >       /*
>> >        * We trigger the read callback only when there atleast
>> MESSAGE_SIZE
>> >        * bytes to be read.
>> >        */
>> >       bufferevent_setwatermark(sconn.bev, EV_READ | EV_WRITE,
>> MESSAGE_SIZE, 0);
>> >       bufferevent_setcb(sconn.bev, peer_buffered_on_read,
>> >                                         peer_buffered_on_write,
>> peer_buffered_on_error, &sconn);
>> >       bufferevent_enable(sconn.bev, EV_READ);
>> >
>> >       if (bufferevent_socket_connect(sconn.bev,
>> >                       (struct sockaddr *)&sin, sizeof(sin)) < 0) {
>> >               closeAndFreeServerConn(&sconn);
>> >               return -1;
>> >       }
>> >
>> >       return 0;
>> > }
>> >
>> > void put_at_server(const char *key, const char *value) {
>> >       char data[MESSAGE_SIZE];
>> >       int lserver_id;
>> >
>> >       memset(data, 0x30, MESSAGE_SIZE);
>> >       data[0] = 'C';
>> >       data[1] = 'S';
>> >       data[2] = CMD_PUT;
>> >
>> >       strncpy(&data[4], key, KEY_SIZE);
>> >       strncpy(&data[24], value, VALUE_SIZE);
>> >
>> >       lserver_id = hash_peer(&data[4]);
>> >       if (!perf_test_on)
>> >               printf("PUT Server Id: %d\n", lserver_id);
>> >
>> >       if (make_server_connection(lserver_id) != 0) {
>> >               printf("Connecting to Server %d failed.\n", lserver_id);
>> >               return;
>> >       }
>> >
>> >       evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
>> >       bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
>> >       event_base_loopexit(sconn.evbase, &peer_timeout);
>> >       event_base_loop(sconn.evbase, 0);
>> >
>> >       /*
>> >        * Though our functional specifications are to maintain the
>> connections
>> >        * in an open state once we conenct, but for some reason if the
>> connections
>> >        * are not closed, the buffers do not seem to flush and we never
>> exit the
>> >        * event loop above OR the server does not exit the event loop
>> connection
>> >        * even though there was only one event from the event base to
>> handle and
>> >        * the other end does not get the data or the callbacks get
>> called. So we
>> >        * explicitly close the connection here. Did try a few different
>> mechanisms
>> >        * for handling this scenario both at server and peer end but
>> none seems to
>> >        * work. So we explicitly deviate from our design recommendation
>> of keeping
>> >        * connections open unfortunately. Needs some further indepth
>> investigation.
>> >        */
>> >       if (sconn.bev != NULL) {
>> >               bufferevent_free(sconn.bev);
>> >               sconn.bev = NULL;
>> >       }
>> >       if (sconn.evbase != NULL) {
>> >               event_base_free(sconn.evbase);
>> >               sconn.evbase = NULL;
>> >       }
>> >       if (sconn.output_buffer != NULL) {
>> >               evbuffer_free(sconn.output_buffer);
>> >               sconn.output_buffer = NULL;
>> >       }
>> > }
>> >
>> > void get_from_server(const char *key) {
>> >       char data[MESSAGE_SIZE];
>> >       int lserver_id;
>> >
>> >       memset(data, 0x30, MESSAGE_SIZE);
>> >       data[0] = 'C';
>> >       data[1] = 'S';
>> >       data[2] = CMD_GET;
>> >
>> >       strncpy(&data[4], key, KEY_SIZE);
>> >
>> >       lserver_id = hash_peer(&data[4]);
>> >       if (!perf_test_on)
>> >               printf("GET Server Id: %d\n", lserver_id);
>> >
>> >       if (make_server_connection(lserver_id) != 0) {
>> >               printf("Connecting to Server %d failed.\n", lserver_id);
>> >               return;
>> >       }
>> >
>> >       evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
>> >       bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
>> >       event_base_loopexit(sconn.evbase, &peer_timeout);
>> >       event_base_loop(sconn.evbase, 0);
>> >
>> >       /*
>> >        * Though our functional specifications are to maintain the
>> connections
>> >        * in an open state once we conenct, but for some reason if the
>> connections
>> >        * are not closed, the buffers do not seem to flush and we never
>> exit the
>> >        * event loop above OR the server does not exit the event loop
>> connection
>> >        * even though there was only one event from the event base to
>> handle and
>> >        * the other end does not get the data or the callbacks get
>> called. So we
>> >        * explicitly close the connection here. Did try a few different
>> mechanisms
>> >        * for handling this scenario both at server and peer end but
>> none seems to
>> >        * work. So we explicitly deviate from our design recommendation
>> of keeping
>> >        * connections open unfortunately. Needs some further indepth
>> investigation.
>> >        */
>> >       if (sconn.bev != NULL) {
>> >               bufferevent_free(sconn.bev);
>> >               sconn.bev = NULL;
>> >       }
>> >       if (sconn.evbase != NULL) {
>> >               event_base_free(sconn.evbase);
>> >               sconn.evbase = NULL;
>> >       }
>> >       if (sconn.output_buffer != NULL) {
>> >               evbuffer_free(sconn.output_buffer);
>> >               sconn.output_buffer = NULL;
>> >       }
>> > }
>> >
>> > void delete_from_server(const char *key) {
>> >       char data[MESSAGE_SIZE];
>> >       int lserver_id;
>> >
>> >       memset(data, 0x30, MESSAGE_SIZE);
>> >       data[0] = 'C';
>> >       data[1] = 'S';
>> >       data[2] = CMD_DEL;
>> >
>> >       strncpy(&data[4], key, KEY_SIZE);
>> >
>> >       lserver_id = hash_peer(&data[4]);
>> >       if (!perf_test_on)
>> >               printf("DEL Server Id: %d\n", lserver_id);
>> >
>> >       if (make_server_connection(lserver_id) != 0) {
>> >               printf("Connecting to Server %d failed.\n", lserver_id);
>> >               return;
>> >       }
>> >
>> >       evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
>> >       bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
>> >       event_base_loopexit(sconn.evbase, &peer_timeout);
>> >       event_base_loop(sconn.evbase, 0);
>> >
>> >       /*
>> >        * Though our functional specifications are to maintain the
>> connections
>> >        * in an open state once we conenct, but for some reason if the
>> connections
>> >        * are not closed, the buffers do not seem to flush and we never
>> exit the
>> >        * event loop above OR the server does not exit the event loop
>> connection
>> >        * even though there was only one event from the event base to
>> handle and
>> >        * the other end does not get the data or the callbacks get
>> called. So we
>> >        * explicitly close the connection here. Did try a few different
>> mechanisms
>> >        * for handling this scenario both at server and peer end but
>> none seems to
>> >        * work. So we explicitly deviate from our design recommendation
>> of keeping
>> >        * connections open unfortunately. Needs some further indepth
>> investigation.
>> >        */
>> >       if (sconn.bev != NULL) {
>> >               bufferevent_free(sconn.bev);
>> >               sconn.bev = NULL;
>> >       }
>> >       if (sconn.evbase != NULL) {
>> >               event_base_free(sconn.evbase);
>> >               sconn.evbase = NULL;
>> >       }
>> >       if (sconn.output_buffer != NULL) {
>> >               evbuffer_free(sconn.output_buffer);
>> >               sconn.output_buffer = NULL;
>> >       }
>> > }
>> >
>> > static void discard_logs(int severity, const char *msg) {
>> >
>> > }
>> >
>> > void set_libevent_logging(void) {
>> >       event_enable_debug_logging(EVENT_DBG_ALL);
>> >       //event_set_log_callback(discard_logs);
>> > }
>> >
>> > void rand_str(char *dest, size_t length) {
>> >       char charset[] = "0123456789"
>> >
>>  "abcdefghijklmnopqrstuvwxyz"
>> >
>>  "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
>> >
>> >       while (length-- > 0) {
>> >               size_t index = (double) rand() / RAND_MAX * (sizeof
>> charset - 1);
>> >               *dest++ = charset[index];
>> >       }
>> >
>> >       *dest = '\0';
>> > }
>> >
>> > void run_perf_tests(void) {
>> >       int i;
>> >       char key[KEY_SIZE];
>> >       char value[VALUE_SIZE];
>> >       struct timeval t1, t2;
>> >       double elapsedtime, totalelapsedtime;
>> >
>> >       perf_test_on = true;
>> >
>> >       /* Run PUT tests */
>> >       for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
>> >               memset(key, 0x30, KEY_SIZE);
>> >               memset(value, 0x30, VALUE_SIZE);
>> >
>> >               key[0] = server_id + 0x31;
>> >               /* Generate a key depending on loop iteration */
>> >               snprintf(&key[1], KEY_SIZE - 1, "%d", i);
>> >               /* Generate a random string value */
>> >               rand_str(value, VALUE_SIZE);
>> >               gettimeofday(&t1, NULL);
>> >               put_at_server(key, value);
>> >               gettimeofday(&t2, NULL);
>> >               // compute and print the elapsed time in millisec
>> >               elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      //
>> sec to ms
>> >               elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   //
>> us to ms
>> >               printf("Elapsed time: %f\n", elapsedtime);
>> >               totalelapsedtime += elapsedtime;
>> >       }
>> >       printf("Average Response time for PUT requests: %f ms\n",
>> totalelapsedtime / NO_OF_TEST_ITERATIONS);
>> >
>> >       printf("Press any key to continue\n");
>> >       getchar();
>> >
>> >       /* Run GET tests */
>> >       for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
>> >               memset(key, 0x30, KEY_SIZE);
>> >
>> >               key[0] = server_id + 0x31;
>> >               /* Generate a key depending on loop iteration */
>> >               snprintf(&key[1], KEY_SIZE - 1, "%d", i);
>> >               gettimeofday(&t1, NULL);
>> >               get_from_server(key);
>> >               gettimeofday(&t2, NULL);
>> >               // compute and print the elapsed time in millisec
>> >               elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      //
>> sec to ms
>> >               elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   //
>> us to ms
>> >               printf("Elapsed time: %f\n", elapsedtime);
>> >               totalelapsedtime += elapsedtime;
>> >       }
>> >       printf("Average Response time for GET requests: %f ms\n",
>> totalelapsedtime / NO_OF_TEST_ITERATIONS);
>> >
>> >       printf("Press any key to continue\n");
>> >       getchar();
>> >
>> >       /* Run DEL tests */
>> >       for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
>> >               memset(key, 0x30, KEY_SIZE);
>> >
>> >               key[0] = server_id + 0x31;
>> >               /* Generate a key depending on loop iteration */
>> >               snprintf(&key[1], KEY_SIZE - 1, "%d", i);
>> >               gettimeofday(&t1, NULL);
>> >               delete_from_server(key);
>> >               gettimeofday(&t2, NULL);
>> >               // compute and print the elapsed time in millisec
>> >               elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      //
>> sec to ms
>> >               elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   //
>> us to ms
>> >               printf("Elapsed time: %f\n", elapsedtime);
>> >               totalelapsedtime += elapsedtime;
>> >       }
>> >       printf("Average Response time for DEL requests: %f ms\n",
>> totalelapsedtime / NO_OF_TEST_ITERATIONS);
>> >
>> >       perf_test_on = false;
>> > }
>> >
>> > void input_process(void) {
>> >       char key[KEY_SIZE];
>> >       char value[VALUE_SIZE];
>> >       bool exitloop = false;
>> >       int input;
>> >
>> >       /*
>> >        * We run the peer functionality in this main thread
>> >        */
>> >       while (!exitloop) {
>> >               printf("\nSelect Operation\n");
>> >               printf("(1) Put (2) Get (3) Delete (4) Run tests\n");
>> >               printf("(5) Exit");
>> >               printf("\nPlease enter your selection (1-5):\t");
>> >
>> >               scanf("%d", &input);
>> >               getchar();
>> >
>> >               switch (input) {
>> >               case 1:
>> >                       printf("Enter Key: \t");
>> >                       scanf("%s", key);
>> >                       printf("\n");
>> >                       printf("Enter Value: \t");
>> >                       scanf("%s", value);
>> >                       printf("\n");
>> >                       put_at_server(key, value);
>> >                       break;
>> >               case 2:
>> >                       printf("Enter Key: \t");
>> >                       scanf("%s", key);
>> >                       printf("\n");
>> >                       get_from_server(key);
>> >                       break;
>> >               case 3:
>> >                       printf("Enter Key: \t");
>> >                       scanf("%s", key);
>> >                       printf("\n");
>> >                       delete_from_server(key);
>> >                       break;
>> >               case 4:
>> >                       run_perf_tests();
>> >                       break;
>> >               case 5:
>> >                       killServer();
>> >                       exitloop = true;
>> >                       break;
>> >               default:
>> >                       printf("\n\nWrong value: %d\n", input);
>> >                       break;
>> >               }
>> >
>> >               /* Reset buffers for next iteration */
>> >               memset(key, 0x30, KEY_SIZE);
>> >               memset(value, 0x30, VALUE_SIZE);
>> >       }
>> > }
>> >
>> > int main(int argc, char *argv[]) {
>> >       FILE *fp;
>> >       int i;
>> >       int error;
>> >       int count_of_servers;
>> >       ssize_t read;
>> >       size_t len = 0;
>> >       char *line = NULL;
>> >       char **tokens = NULL;
>> >
>> >       if (argc != 3) {
>> >               /*
>> >                * We do not validate or error check any of the arguments
>> >                * Please enter correct arguments
>> >                */
>> >               printf("Usage: ./server <serverid#>
>> </path/to/server/conf/file>");
>> >               exit(1);
>> >       }
>> >
>> >       server_id = atoi(argv[1]) - 1;
>> >       if ((server_id < 0) || (server_id > MAX_NO_OF_SERVERS)) {
>> >               printf("Incorrect server id provided\n");
>> >               exit(1);
>> >       }
>> >
>> >       fp = fopen(argv[2], "r");
>> >       if (fp == NULL) {
>> >               perror("Could not open server configuration file");
>> >               exit(1);
>> >       }
>> >
>> >       /*
>> >        * We now extract the IP and port information of 8 servers
>> >        * which will be involved in this setup.
>> >        */
>> >       count_of_servers = 0;
>> >       while ((read = getline(&line, &len, fp)) != -1) {
>> >
>> >               if (count_of_servers == MAX_NO_OF_SERVERS)
>> >                       break;
>> >
>> >               tokens = str_split(line, ' ');
>> >               if (tokens) {
>> >                       servers[count_of_servers].serverip = *(tokens);
>> >                       servers[count_of_servers].serverport = *(tokens +
>> 1);
>> >               }
>> >               free(line);
>> >               line = NULL;
>> >
>> >               count_of_servers++;
>> >       }
>> >
>> >       fclose(fp);
>> >
>> >       if (pthread_rwlock_init(&ht_lock, NULL) != 0) {
>> >               perror("Lock init failed");
>> >               goto free_tokens;
>> >       }
>> >
>> >       /*
>> >        * Start the server. We start the server in another thread so
>> >        * the libevent event loop for dispatching events on connections
>> >        * can work separately which would otherwise block.
>> >        */
>> >       error = pthread_create(&server_thread, NULL, &runServer,
>> &servers[server_id]);
>> >       if (error != 0) {
>> >               perror("Error in server thread creation");
>> >               goto free_tokens;
>> >       }
>> >
>> >       peer_timeout.tv_sec = 0;
>> >       peer_timeout.tv_usec = PEER_CONN_TIMEOUT;
>> >
>> >       input_process();
>> >
>> >       for (i = count_of_servers - 1; i >= 0; --i) {
>> >               free(servers[i].serverip);
>> >               free(servers[i].serverport);
>> >       }
>> >
>> >       pthread_rwlock_destroy(&ht_lock);
>> >
>> >       return 0;
>> >
>> > free_tokens:
>> >       for (i = count_of_servers - 1; i >= 0; --i) {
>> >               free(servers[i].serverip);
>> >               free(servers[i].serverport);
>> >       }
>> >
>> >       return -1;
>> > }
>> >
>>
>>
>> --
>> Respectfully
>> Azat Khuzhin
>> ***********************************************************************
>> To unsubscribe, send an e-mail to [email protected] with
>> unsubscribe libevent-users    in the body.
>>
>

Reply via email to