A good resource for the "proper way" to use libevent is to read the libevent book. http://www.wangafu.net/~nickm/libevent-book/
On Tue, Oct 6, 2015 at 2:51 AM Azat Khuzhin <[email protected]> wrote: > On Mon, Oct 05, 2015 at 10:43:19PM +0530, Sanchayan Maity wrote: > > Hello, > > > > I am writing a simple DHT application and using libevent in my > > application. The libevent version is 2.0 which is installed by > > default on my Arch Linux setup. > > > > After some searching I found an example to use libevent in a > > multi-threaded setup. > > > http://roncemer.com/software-development/multi-threaded-libevent-server-example/#comment-3249 > > > > The above code runs one event base in its own separate thread and > > uses a threaded workqueue model to service requests on incoming > > connections. > > Hi, > > Personally I prefer to use pipe/socketpair for scheduling jobs to > another threads, in a nutshell it works like this: > - acceptcb: > write(notify_fd_write_side, user-struct, sizeof(user-struct)) > - per thread event base readcb for notify_fd read side: > read(notify_fd_read_size, buf, sizeof(user-struct)) > > It is much simpler then using pthread_cond_*, but both of them must > works at first glance. > > Here is a few implementations using this mechanism: > - > https://github.com/ellzey/libevhtp/blob/libevhtp2/src/evhtp2/evhtp_thr.c#L60 > - > https://github.com/azat/boostcache/blob/libevent-aio-v2/src/kernel/net/commandserver.cpp#L206 > > Other comments are inlined in. > > > I have taken that sample code and modified it for my own purposes. > > While my application is functional when I deploy eight of them in > > my setup and then check for the basic functionality, while trying > > to run multiple operations from multiple application nodes I get > > some messages which I do not fully comprehend at the moment. > > > > They are as follows: > > 1. Epoll %s(%d) on fd %d failed. Old events were %d; read change was %d > (%s); write change was %d (%s); close change was %d (%s) > > > > Which seems to be printed from epoll.c by event warn mechanism > > Yep. > > > > > 2. Too many open files > > This can be because of too much files (and indeed you have overhead -- > since you have own base for every socket, and it means that you will > have epollfd for every socket which is not how this must works -- see > commends to you sample). > > And I guess that this is the main problem that you have, other (with > epoll failed message) will be fixed when you fix EMFILE problem. > > But if you still sure that all fds that you have are needful you can > increase limits: > > $ ulimit -n $((1<<20)) > Or permanently in /etc/security/limits.conf. > > But that limits are limited with fs.file-max sysctl, so if you need more > increase it. > > > 3. One more which I failed to note and was related to EPOLL MOD > > > > I know perhaps this is too much to ask, but can someone have a > > look at the code and tell me things I am doing wrong. I have gone > > through the libevent documentation online but I am not sure if I > > am doing things the right way at the moment. > > > > The peer part of dht I try to make synchronous by trying to break > > the event base loop with event loopbreak call in the read cb however > > sometimes if the callback never gets called which happens, the peer > > part of the DHT is just stuck, so to circumvent this I use a timeout. > > > > The code is attached. I have tried to keep it clean. I am no networking > > or socket expert, appreciate any comments or feedback. > > > > Thanks & Regards, > > Sanchayan Maity. > > > > > > > > > /* > > * Copyright (C) 2015 Sanchayan Maity <[email protected]> > > * > > * Author: Sanchayan Maity <[email protected]> > > * <[email protected]> > > * > > * This program is free software; you can redistribute it and/or modify > > * it under the terms of the GNU General Public License version 2 and > > * only version 2 as published by the Free Software Foundation. > > * > > * This program is distributed in the hope that it will be useful, > > * but WITHOUT ANY WARRANTY; without even the implied warranty of > > * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > > * GNU General Public License for more details. > > */ > > #include "common.h" > > > > /* > > * Struct to carry around connection (client)-specific data. > > */ > > typedef struct client { > > /* The client's socket. */ > > int fd; > > > > /* The event_base for this client. */ > > struct event_base *evbase; > > > > /* The bufferedevent for this client. */ > > struct bufferevent *buf_ev; > > > > /* The output buffer for this client. */ > > struct evbuffer *output_buffer; > > > > /* Here you can add your own application-specific attributes which > > * are connection-specific. */ > > } client_t; > > > > /* Table entry */ > > struct node_t { > > /* Key */ > > char *key; > > /* Value of the key */ > > char *value; > > /* Next entry in chain */ > > struct node_t *next; > > }; > > > > static struct node_t *hashtable[SERVER_HASH_TABLE_SIZE]; > > > > /* > > * We use a read write lock to protect against concurrent > > * write to the hash table. It is ok to have concurrent > > * readers. We do not use a mutex as that will reduce > > * reader concurrency to a single thread at a time. > > */ > > pthread_rwlock_t ht_lock; > > > > static struct event_base *evbase_accept; > > /* Workqueue for the server */ > > static workqueue_t workqueue; > > > > /* > > * Id of server. We use this to pick up appropriate > > * IP/port parameters from the file. > > */ > > static int server_id; > > > > struct server_p { > > /* IP to which server will bind to */ > > char *serverip; > > /* Port on which server will listen */ > > char *serverport; > > }; > > > > /* > > * We use this to store server parameters of the eight > > * servers information read from file > > */ > > static struct server_p servers[MAX_NO_OF_SERVERS]; > > > > pthread_t server_thread; > > static volatile bool perf_test_on = false; > > > > /* Signal handler function (defined below). */ > > static void sighandler(int signal); > > > > /* > > * Struct to carry around server connection specific data > > */ > > struct server_conn { > > /* Event base for the peer server connection */ > > struct event_base *evbase; > > /* Buffer event for the peer server connection */ > > struct bufferevent *bev; > > /* Output buffer for the peer server connection */ > > struct evbuffer *output_buffer; > > }; > > > > static struct server_conn sconn; > > > > struct timeval peer_timeout; > > > > static void closeClient(client_t *client) { > > if (client != NULL) { > > if (client->fd >= 0) { > > close(client->fd); > > client->fd = -1; > > } > > } > > } > > > > static void closeAndFreeClient(client_t *client) { > > if (client != NULL) { > > closeClient(client); > > if (client->buf_ev != NULL) { > > bufferevent_free(client->buf_ev); > > client->buf_ev = NULL; > > } > > if (client->evbase != NULL) { > > event_base_free(client->evbase); > > client->evbase = NULL; > > } > > if (client->output_buffer != NULL) { > > evbuffer_free(client->output_buffer); > > client->output_buffer = NULL; > > } > > free(client); > > } > > } > > > > /* > > * https://en.wikipedia.org/wiki/Jenkins_hash_function > > */ > > unsigned int jenkins_one_at_a_time_hash(const char *key, size_t len) { > > unsigned int hash, i; > > > > for(hash = i = 0; i < len; ++i) > > { > > hash += key[i]; > > hash += (hash << 10); > > hash ^= (hash >> 6); > > } > > > > hash += (hash << 3); > > hash ^= (hash >> 11); > > hash += (hash << 15); > > > > return hash; > > } > > > > /* > > * Hash function. Use the above and restrict result as per the table > > * size. We use a non power of 2 to get good hashing. Refer CLRS. > > */ > > unsigned int hash_server(const char *key) { > > return jenkins_one_at_a_time_hash(key, KEY_SIZE) % > SERVER_HASH_TABLE_SIZE; > > } > > > > /* > > * Hash function. Use the above and restrict result as per the table > > * size. We use a non power of 2 to get good hashing. Refer CLRS. > > */ > > unsigned int hash_peer(const char *key) { > > return jenkins_one_at_a_time_hash(key, KEY_SIZE) % > PEER_HASH_TABLE_SIZE; > > } > > > > /* > > * Get the node data pointer as per the key > > */ > > struct node_t *hash_table_get(const char *key) { > > struct node_t *np; > > > > pthread_rwlock_rdlock(&ht_lock); > > for(np = hashtable[hash_server(key)]; np != NULL; np = np->next) { > > if (strncmp(key, np->key, KEY_SIZE) == 0) { > > pthread_rwlock_unlock(&ht_lock); > > /* We found the key */ > > return np; > > } > > } > > > > pthread_rwlock_unlock(&ht_lock); > > return NULL; > > } > > > > /* > > * We determine if the key being added exists. If it does, the > > * new value supersedes the old one, else we create a new entry > > * and add the key/value pair. Return NULL on any error. > > */ > > struct node_t *hash_table_put(const char *key, const char *value) { > > unsigned int hashval; > > struct node_t *np; > > > > pthread_rwlock_wrlock(&ht_lock); > > if ((np = hash_table_get(key)) == NULL) { /* Not found */ > > np = (struct node_t *)malloc(sizeof(*np)); > > if (np == NULL || (np->key = strndup(key, KEY_SIZE)) == > NULL) > > goto error; > > > > /* Find the bucket position and add at 'head' location */ > > hashval = hash_server(key); > > np->next = hashtable[hashval]; > > hashtable[hashval] = np; > > } else /* Already there */ > > free((void *) np->value); /* Free previous value */ > > if ((np->value = strndup(value, VALUE_SIZE)) == NULL) > > goto error; > > > > return np; > > > > error: > > pthread_rwlock_unlock(&ht_lock); > > return NULL; > > } > > > > /* > > * Return 0 on success and 1 on failure > > */ > > unsigned int hash_table_delete(const char *key) { > > struct node_t *np1, *np2; > > unsigned int hashval; > > > > hashval = hash_server(key); > > > > pthread_rwlock_wrlock(&ht_lock); > > for (np1 = hashtable[hashval], np2 = NULL; np1 != NULL; np2 = np1, > np1 = np1->next) > > if (strncmp(key, np1->key, KEY_SIZE) == 0) { > > /* Found a match */ > > free(np1->key); > > free(np1->value); > > if (np2 == NULL) > > /* At the beginning? */ > > hashtable[hashval] = np1->next; > > else > > /* In the middle or at the end? */ > > np2->next = np1->next; > > free(np1); > > > > pthread_rwlock_unlock(&ht_lock); > > return 0; > > } > > > > pthread_rwlock_unlock(&ht_lock); > > return 1; > > } > > > > /* > > * Called by libevent when there is data to read. > > */ > > void server_buffered_on_read(struct bufferevent *bev, void *arg) { > > client_t *client = (client_t *)arg; > > char data[MESSAGE_SIZE] = {0}; > > struct node_t *np; > > > > struct evbuffer *input; > > input = bufferevent_get_input(bev); > > /* > > * Remove a chunk of data from the input buffer, copying it into > our > > * local array (data). > > */ > > evbuffer_remove(input, data, MESSAGE_SIZE); > > > > /* > > * Check if the message is meant for us and the command. While > sending > > * data back, we use the same buffer keeping header and key > information > > * but changing/appending the value of the key and setting the OK > or ERROR command > > */ > > if (data[0] == 'C' && data[1] == 'S') { > > switch (data[2]) { > > case CMD_PUT: > > if ((hash_table_put(&data[4], &data[24])) == NULL) > > data[3] = CMD_ERR; > > else > > data[3] = CMD_OK; > > evbuffer_add(client->output_buffer, data, > MESSAGE_SIZE); > > break; > > case CMD_GET: > > np = hash_table_get(&data[4]); > > if (np == NULL) { > > data[3] = CMD_ERR; > > evbuffer_add(client->output_buffer, data, > MESSAGE_SIZE); > > } else { > > data[3] = CMD_OK; > > strncpy(&data[24], np->value, VALUE_SIZE); > > evbuffer_add(client->output_buffer, data, > MESSAGE_SIZE); > > } > > break; > > case CMD_DEL: > > if (hash_table_delete(&data[4])) > > data[3] = CMD_ERR; > > else > > data[3] = CMD_OK; > > evbuffer_add(client->output_buffer, data, > MESSAGE_SIZE); > > break; > > default: > > break; > > } > > } > > > > /* Send the results to the peer. This actually only queues the > results > > * for sending. Sending will occur asynchronously, handled by > libevent. */ > > if (bufferevent_write_buffer(bev, client->output_buffer)) { > > errorOut("Error sending data to client on fd %d\n", client->fd); > > closeClient(client); > > } > > } > > > > /** > > * Called by libevent when the write buffer reaches 0. We only > > * provide this because libevent expects it, but we don't use it. > > */ > > void server_buffered_on_write(struct bufferevent *bev, void *arg) { > > } > > > > /** > > * Called by libevent when there is an error on underlying the socket > > * descriptor. > > */ > > void server_buffered_on_error(struct bufferevent *bev, short events, > void *arg) { > > if (events & BEV_EVENT_ERROR) { > > printf("Server: Got an error: %s\n", > > > evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); > > } > > closeClient((client_t *)arg); > > } > > > > static void server_job_function(struct job *job) { > > client_t *client = (client_t *)job->user_data; > > > > event_base_dispatch(client->evbase); > > closeAndFreeClient(client); > > free(job); > > } > > > > /* > > * This function will be called by libevent when there is a connection > > * ready to be accepted. > > */ > > void server_on_accept(evutil_socket_t fd, short ev, void *arg) { > > int client_fd; > > struct sockaddr_in client_addr; > > socklen_t client_len = sizeof(client_addr); > > workqueue_t *workqueue = (workqueue_t *)arg; > > client_t *client; > > job_t *job; > > > > client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); > > if (client_fd < 0) { > > warn("accept failed"); > > return; > > } > > > > /* Set the client socket to non-blocking mode. */ > > if (evutil_make_socket_nonblocking(client_fd) < 0) { > > warn("failed to set client socket to non-blocking"); > > close(client_fd); > > return; > > } > > > > /* Create a client object. */ > > if ((client = (client_t *)malloc(sizeof(*client))) == NULL) { > > warn("failed to allocate memory for client state"); > > close(client_fd); > > return; > > } > > memset(client, 0, sizeof(*client)); > > client->fd = client_fd; > > > > if ((client->output_buffer = evbuffer_new()) == NULL) { > > warn("client output buffer allocation failed"); > > closeAndFreeClient(client); > > return; > > } > > > > if ((client->evbase = event_base_new()) == NULL) { > > warn("client event_base creation failed"); > > closeAndFreeClient(client); > > return; > > } > > Are you sure you need this overhead (i.e. base per socket)? > AFAICS you don't need this. > > > > > client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, > > BEV_OPT_CLOSE_ON_FREE | > BEV_OPT_DEFER_CALLBACKS); > > if ((client->buf_ev) == NULL) { > > warn("client bufferevent creation failed"); > > closeAndFreeClient(client); > > return; > > } > > > > /* > > * We trigger the read callback only when there atleast > MESSAGE_SIZE > > * bytes to be read. > > */ > > bufferevent_setwatermark(client->buf_ev, EV_READ, MESSAGE_SIZE, 0); > > bufferevent_setcb(client->buf_ev, server_buffered_on_read, > server_buffered_on_write, > > server_buffered_on_error, client); > > > > /* We have to enable it before our callbacks will be > > * called. */ > > bufferevent_enable(client->buf_ev, EV_READ); > > > > /* Create a job object and add it to the work queue. */ > > if ((job = (job_t *)malloc(sizeof(*job))) == NULL) { > > warn("failed to allocate memory for job state"); > > closeAndFreeClient(client); > > return; > > } > > job->job_function = server_job_function; > > job->user_data = client; > > > > workqueue_add_job(workqueue, job); > > } > > > > /* > > * Run the server. This function blocks, only returning when the server > has > > * terminated. > > */ > > void *runServer(void *arg) { > > struct server_p *server_i = (struct server_p *)arg; > > evutil_socket_t listenfd; > > struct sockaddr_in listen_addr; > > struct event *ev_accept; > > int reuseaddr_on; > > struct sigaction siginfo; > > > > /* Set signal handlers */ > > sigset_t sigset; > > sigemptyset(&sigset); > > siginfo.sa_handler = sighandler; > > siginfo.sa_mask = sigset; > > siginfo.sa_flags = SA_RESTART; > > > > sigaction(SIGINT, &siginfo, NULL); > > sigaction(SIGTERM, &siginfo, NULL); > > > > /* Create our listening socket */ > > listenfd = socket(AF_INET, SOCK_STREAM, 0); > > if (listenfd < 0) { > > err(1, "listen failed"); > > goto exit; > > } > > > > memset(&listen_addr, 0, sizeof(listen_addr)); > > listen_addr.sin_family = AF_INET; > > listen_addr.sin_port = htons(atoi(server_i->serverport)); > > inet_pton(AF_INET, server_i->serverip, &listen_addr.sin_addr); > > > > if (bind(listenfd, (struct sockaddr *)&listen_addr, > sizeof(listen_addr)) < 0) { > > err(1, "bind failed"); > > goto exit; > > } > > > > if (listen(listenfd, CONNECTION_BACKLOG) < 0) { > > err(1, "listen failed"); > > goto exit; > > } > > reuseaddr_on = 1; > > setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, > > sizeof(reuseaddr_on)); > > > > /* > > * Set the socket to non-blocking, this is essential in event > > * based programming with libevent. > > */ > > if (evutil_make_socket_nonblocking(listenfd) < 0) { > > err(1, "failed to set server socket to non-blocking"); > > goto exit; > > } > > > > if ((evbase_accept = event_base_new()) == NULL) { > > perror("Unable to create socket accept event base"); > > close(listenfd); > > goto exit; > > } > > > > /* Initialize work queue. */ > > if (workqueue_init(&workqueue, NUM_THREADS)) { > > perror("Failed to create work queue"); > > close(listenfd); > > workqueue_shutdown(&workqueue); > > goto exit; > > } > > > > /* We now have a listening socket, we create a read event to > > * be notified when a client connects. */ > > ev_accept = event_new(evbase_accept, listenfd, EV_READ | EV_PERSIST, > > server_on_accept, (void > *)&workqueue); > > event_add(ev_accept, NULL); > > > > printf("Server running.\n"); > > > > /* Start the event loop. */ > > event_base_dispatch(evbase_accept); > > > > event_base_free(evbase_accept); > > evbase_accept = NULL; > > > > close(listenfd); > > > > printf("Server shutdown.\n"); > > > > exit: > > pthread_exit(NULL); > > } > > > > /* > > * Kill the server. This function can be called from another thread to > kill > > * the server, causing runServer() to return. > > */ > > void killServer(void) { > > fprintf(stdout, "Stopping socket listener event loop.\n"); > > if (event_base_loopexit(evbase_accept, NULL)) { > > perror("Error shutting down server"); > > } > > fprintf(stdout, "Stopping workers.\n"); > > workqueue_shutdown(&workqueue); > > } > > > > static void sighandler(int signal) { > > fprintf(stdout, "Received signal %d: %s. Shutting down.\n", signal, > > strsignal(signal)); > > killServer(); > > } > > > > /* > > * Taken from > http://stackoverflow.com/questions/9210528/split-string-with-delimiters-in-c > > * We modify it to use strtok_r the MT safe variant. strtok is not MT > safe. > > */ > > char** str_split(char* a_str, const char a_delim) { > > char** result = 0; > > size_t count = 0; > > char* tmp = a_str; > > char* last_comma = 0; > > char* save = 0; > > char delim[2]; > > delim[0] = a_delim; > > delim[1] = 0; > > > > /* Count how many elements will be extracted. */ > > while (*tmp) { > > if (a_delim == *tmp) { > > count++; > > last_comma = tmp; > > } > > tmp++; > > } > > > > /* Add space for trailing token. */ > > count += last_comma < (a_str + strlen(a_str) - 1); > > > > /* Add space for terminating null string so caller > > knows where the list of returned strings ends. */ > > count++; > > > > result = (char **)malloc(sizeof(char*) * count); > > > > if (result) { > > size_t idx = 0; > > //char* token = strtok(a_str, delim); > > char* token = strtok_r(a_str, delim, &save); > > > > while (token) { > > *(result + idx++) = strdup(token); > > //token = strtok(0, delim); > > token = strtok_r(0, delim, &save); > > } > > assert(idx == count - 1); > > *(result + idx) = 0; > > } > > > > return result; > > } > > > > static void print_key(char *data, int start_pos) { > > int i; > > > > for (i = start_pos; i < start_pos + KEY_SIZE; i++) > > printf("%c", data[i]); > > } > > > > static void closeAndFreeServerConn(struct server_conn *client) { > > if (client != NULL) { > > if (client->bev != NULL) { > > bufferevent_free(client->bev); > > client->bev = NULL; > > } > > if (client->evbase != NULL) { > > event_base_free(client->evbase); > > client->evbase = NULL; > > } > > if (client->output_buffer != NULL) { > > evbuffer_free(client->output_buffer); > > client->output_buffer = NULL; > > } > > } > > } > > > > /* > > * Called by libevent when there is data to read. > > */ > > void peer_buffered_on_read(struct bufferevent *bev, void *arg) { > > struct server_conn *sconn = (struct server_conn *)arg; > > char data[MESSAGE_SIZE] = {0}; > > > > struct evbuffer *input; > > input = bufferevent_get_input(bev); > > > > /* > > * Remove a chunk of data from the input buffer, copying it into > our > > * local array (data). > > */ > > evbuffer_remove(input, data, MESSAGE_SIZE); > > > > /* > > * Reply from server > > */ > > if (data[0] == 'C' && data[1] == 'S') { > > switch (data[2]) { > > case CMD_PUT: > > if (data[3] == CMD_OK) { > > if (!perf_test_on) > > printf("\nPut operation > successful\n"); > > } else { > > if (!perf_test_on) > > printf("\nPut operation failed\n"); > > } > > > > if (!perf_test_on) { > > printf("Key was: "); > > print_key(data, 4); > > printf("\n"); > > printf("Value was: %s\n\n", &data[24]); > > } > > break; > > case CMD_GET: > > if (data[3] == CMD_OK) { > > if (!perf_test_on) { > > printf("\nGet operation > successful\n"); > > printf("Key was: "); > > print_key(data, 4); > > printf("\n"); > > printf("Value is: %s\n\n", > &data[24]); > > } > > } else { > > if (!perf_test_on) { > > printf("\nGet operation failed\n"); > > printf("Key was: "); > > print_key(data, 4); > > printf("\n\n"); > > } > > } > > break; > > case CMD_DEL: > > if (data[3] == CMD_OK) { > > if (!perf_test_on) > > printf("\nDelete operation > successful\n"); > > } > > else { > > if (!perf_test_on) > > printf("\nDelete operation > failed\n"); > > } > > > > if (!perf_test_on) { > > printf("Key was: "); > > print_key(data, 4); > > printf("\n\n"); > > } > > break; > > default: > > break; > > } > > } > > > > if (sconn->evbase != NULL) > > event_base_loopbreak(sconn->evbase); > > } > > > > /* > > * Called by libevent when there is an error on underlying the socket > > * descriptor. > > */ > > void peer_buffered_on_error(struct bufferevent *bev, short events, void > *arg) { > > if (events & BEV_EVENT_ERROR) { > > printf("Peer: Got an error: %s\n", > > > evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); > > } > > } > > > > /** > > * Called by libevent when the write buffer reaches 0. We only > > * provide this because libevent expects it, but we don't use it. > > */ > > void peer_buffered_on_write(struct bufferevent *bev, void *arg) { > > } > > > > static int make_server_connection(int lserver_id) { > > struct sockaddr_in sin; > > > > memset(&sin, 0, sizeof(sin)); > > sin.sin_family = AF_INET; > > sin.sin_port = htons(atoi(servers[lserver_id].serverport)); > > inet_pton(AF_INET, servers[lserver_id].serverip, &sin.sin_addr); > > > > if ((sconn.output_buffer = evbuffer_new()) == NULL) { > > closeAndFreeServerConn(&sconn); > > return -1; > > } > > > > if ((sconn.evbase = event_base_new()) == NULL) { > > closeAndFreeServerConn(&sconn); > > return -1; > > } > > > > if ((sconn.bev = bufferevent_socket_new(sconn.evbase, > > -1, BEV_OPT_CLOSE_ON_FREE | > BEV_OPT_DEFER_CALLBACKS)) == NULL) { > > closeAndFreeServerConn(&sconn); > > return -1; > > } > > > > /* > > * We trigger the read callback only when there atleast > MESSAGE_SIZE > > * bytes to be read. > > */ > > bufferevent_setwatermark(sconn.bev, EV_READ | EV_WRITE, > MESSAGE_SIZE, 0); > > bufferevent_setcb(sconn.bev, peer_buffered_on_read, > > peer_buffered_on_write, > peer_buffered_on_error, &sconn); > > bufferevent_enable(sconn.bev, EV_READ); > > > > if (bufferevent_socket_connect(sconn.bev, > > (struct sockaddr *)&sin, sizeof(sin)) < 0) { > > closeAndFreeServerConn(&sconn); > > return -1; > > } > > > > return 0; > > } > > > > void put_at_server(const char *key, const char *value) { > > char data[MESSAGE_SIZE]; > > int lserver_id; > > > > memset(data, 0x30, MESSAGE_SIZE); > > data[0] = 'C'; > > data[1] = 'S'; > > data[2] = CMD_PUT; > > > > strncpy(&data[4], key, KEY_SIZE); > > strncpy(&data[24], value, VALUE_SIZE); > > > > lserver_id = hash_peer(&data[4]); > > if (!perf_test_on) > > printf("PUT Server Id: %d\n", lserver_id); > > > > if (make_server_connection(lserver_id) != 0) { > > printf("Connecting to Server %d failed.\n", lserver_id); > > return; > > } > > > > evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE); > > bufferevent_write_buffer(sconn.bev, sconn.output_buffer); > > event_base_loopexit(sconn.evbase, &peer_timeout); > > event_base_loop(sconn.evbase, 0); > > > > /* > > * Though our functional specifications are to maintain the > connections > > * in an open state once we conenct, but for some reason if the > connections > > * are not closed, the buffers do not seem to flush and we never > exit the > > * event loop above OR the server does not exit the event loop > connection > > * even though there was only one event from the event base to > handle and > > * the other end does not get the data or the callbacks get > called. So we > > * explicitly close the connection here. Did try a few different > mechanisms > > * for handling this scenario both at server and peer end but none > seems to > > * work. So we explicitly deviate from our design recommendation > of keeping > > * connections open unfortunately. Needs some further indepth > investigation. > > */ > > if (sconn.bev != NULL) { > > bufferevent_free(sconn.bev); > > sconn.bev = NULL; > > } > > if (sconn.evbase != NULL) { > > event_base_free(sconn.evbase); > > sconn.evbase = NULL; > > } > > if (sconn.output_buffer != NULL) { > > evbuffer_free(sconn.output_buffer); > > sconn.output_buffer = NULL; > > } > > } > > > > void get_from_server(const char *key) { > > char data[MESSAGE_SIZE]; > > int lserver_id; > > > > memset(data, 0x30, MESSAGE_SIZE); > > data[0] = 'C'; > > data[1] = 'S'; > > data[2] = CMD_GET; > > > > strncpy(&data[4], key, KEY_SIZE); > > > > lserver_id = hash_peer(&data[4]); > > if (!perf_test_on) > > printf("GET Server Id: %d\n", lserver_id); > > > > if (make_server_connection(lserver_id) != 0) { > > printf("Connecting to Server %d failed.\n", lserver_id); > > return; > > } > > > > evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE); > > bufferevent_write_buffer(sconn.bev, sconn.output_buffer); > > event_base_loopexit(sconn.evbase, &peer_timeout); > > event_base_loop(sconn.evbase, 0); > > > > /* > > * Though our functional specifications are to maintain the > connections > > * in an open state once we conenct, but for some reason if the > connections > > * are not closed, the buffers do not seem to flush and we never > exit the > > * event loop above OR the server does not exit the event loop > connection > > * even though there was only one event from the event base to > handle and > > * the other end does not get the data or the callbacks get > called. So we > > * explicitly close the connection here. Did try a few different > mechanisms > > * for handling this scenario both at server and peer end but none > seems to > > * work. So we explicitly deviate from our design recommendation > of keeping > > * connections open unfortunately. Needs some further indepth > investigation. > > */ > > if (sconn.bev != NULL) { > > bufferevent_free(sconn.bev); > > sconn.bev = NULL; > > } > > if (sconn.evbase != NULL) { > > event_base_free(sconn.evbase); > > sconn.evbase = NULL; > > } > > if (sconn.output_buffer != NULL) { > > evbuffer_free(sconn.output_buffer); > > sconn.output_buffer = NULL; > > } > > } > > > > void delete_from_server(const char *key) { > > char data[MESSAGE_SIZE]; > > int lserver_id; > > > > memset(data, 0x30, MESSAGE_SIZE); > > data[0] = 'C'; > > data[1] = 'S'; > > data[2] = CMD_DEL; > > > > strncpy(&data[4], key, KEY_SIZE); > > > > lserver_id = hash_peer(&data[4]); > > if (!perf_test_on) > > printf("DEL Server Id: %d\n", lserver_id); > > > > if (make_server_connection(lserver_id) != 0) { > > printf("Connecting to Server %d failed.\n", lserver_id); > > return; > > } > > > > evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE); > > bufferevent_write_buffer(sconn.bev, sconn.output_buffer); > > event_base_loopexit(sconn.evbase, &peer_timeout); > > event_base_loop(sconn.evbase, 0); > > > > /* > > * Though our functional specifications are to maintain the > connections > > * in an open state once we conenct, but for some reason if the > connections > > * are not closed, the buffers do not seem to flush and we never > exit the > > * event loop above OR the server does not exit the event loop > connection > > * even though there was only one event from the event base to > handle and > > * the other end does not get the data or the callbacks get > called. So we > > * explicitly close the connection here. Did try a few different > mechanisms > > * for handling this scenario both at server and peer end but none > seems to > > * work. So we explicitly deviate from our design recommendation > of keeping > > * connections open unfortunately. Needs some further indepth > investigation. > > */ > > if (sconn.bev != NULL) { > > bufferevent_free(sconn.bev); > > sconn.bev = NULL; > > } > > if (sconn.evbase != NULL) { > > event_base_free(sconn.evbase); > > sconn.evbase = NULL; > > } > > if (sconn.output_buffer != NULL) { > > evbuffer_free(sconn.output_buffer); > > sconn.output_buffer = NULL; > > } > > } > > > > static void discard_logs(int severity, const char *msg) { > > > > } > > > > void set_libevent_logging(void) { > > event_enable_debug_logging(EVENT_DBG_ALL); > > //event_set_log_callback(discard_logs); > > } > > > > void rand_str(char *dest, size_t length) { > > char charset[] = "0123456789" > > > "abcdefghijklmnopqrstuvwxyz" > > > "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; > > > > while (length-- > 0) { > > size_t index = (double) rand() / RAND_MAX * (sizeof > charset - 1); > > *dest++ = charset[index]; > > } > > > > *dest = '\0'; > > } > > > > void run_perf_tests(void) { > > int i; > > char key[KEY_SIZE]; > > char value[VALUE_SIZE]; > > struct timeval t1, t2; > > double elapsedtime, totalelapsedtime; > > > > perf_test_on = true; > > > > /* Run PUT tests */ > > for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) { > > memset(key, 0x30, KEY_SIZE); > > memset(value, 0x30, VALUE_SIZE); > > > > key[0] = server_id + 0x31; > > /* Generate a key depending on loop iteration */ > > snprintf(&key[1], KEY_SIZE - 1, "%d", i); > > /* Generate a random string value */ > > rand_str(value, VALUE_SIZE); > > gettimeofday(&t1, NULL); > > put_at_server(key, value); > > gettimeofday(&t2, NULL); > > // compute and print the elapsed time in millisec > > elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0; // > sec to ms > > elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0; // us > to ms > > printf("Elapsed time: %f\n", elapsedtime); > > totalelapsedtime += elapsedtime; > > } > > printf("Average Response time for PUT requests: %f ms\n", > totalelapsedtime / NO_OF_TEST_ITERATIONS); > > > > printf("Press any key to continue\n"); > > getchar(); > > > > /* Run GET tests */ > > for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) { > > memset(key, 0x30, KEY_SIZE); > > > > key[0] = server_id + 0x31; > > /* Generate a key depending on loop iteration */ > > snprintf(&key[1], KEY_SIZE - 1, "%d", i); > > gettimeofday(&t1, NULL); > > get_from_server(key); > > gettimeofday(&t2, NULL); > > // compute and print the elapsed time in millisec > > elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0; // > sec to ms > > elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0; // us > to ms > > printf("Elapsed time: %f\n", elapsedtime); > > totalelapsedtime += elapsedtime; > > } > > printf("Average Response time for GET requests: %f ms\n", > totalelapsedtime / NO_OF_TEST_ITERATIONS); > > > > printf("Press any key to continue\n"); > > getchar(); > > > > /* Run DEL tests */ > > for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) { > > memset(key, 0x30, KEY_SIZE); > > > > key[0] = server_id + 0x31; > > /* Generate a key depending on loop iteration */ > > snprintf(&key[1], KEY_SIZE - 1, "%d", i); > > gettimeofday(&t1, NULL); > > delete_from_server(key); > > gettimeofday(&t2, NULL); > > // compute and print the elapsed time in millisec > > elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0; // > sec to ms > > elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0; // us > to ms > > printf("Elapsed time: %f\n", elapsedtime); > > totalelapsedtime += elapsedtime; > > } > > printf("Average Response time for DEL requests: %f ms\n", > totalelapsedtime / NO_OF_TEST_ITERATIONS); > > > > perf_test_on = false; > > } > > > > void input_process(void) { > > char key[KEY_SIZE]; > > char value[VALUE_SIZE]; > > bool exitloop = false; > > int input; > > > > /* > > * We run the peer functionality in this main thread > > */ > > while (!exitloop) { > > printf("\nSelect Operation\n"); > > printf("(1) Put (2) Get (3) Delete (4) Run tests\n"); > > printf("(5) Exit"); > > printf("\nPlease enter your selection (1-5):\t"); > > > > scanf("%d", &input); > > getchar(); > > > > switch (input) { > > case 1: > > printf("Enter Key: \t"); > > scanf("%s", key); > > printf("\n"); > > printf("Enter Value: \t"); > > scanf("%s", value); > > printf("\n"); > > put_at_server(key, value); > > break; > > case 2: > > printf("Enter Key: \t"); > > scanf("%s", key); > > printf("\n"); > > get_from_server(key); > > break; > > case 3: > > printf("Enter Key: \t"); > > scanf("%s", key); > > printf("\n"); > > delete_from_server(key); > > break; > > case 4: > > run_perf_tests(); > > break; > > case 5: > > killServer(); > > exitloop = true; > > break; > > default: > > printf("\n\nWrong value: %d\n", input); > > break; > > } > > > > /* Reset buffers for next iteration */ > > memset(key, 0x30, KEY_SIZE); > > memset(value, 0x30, VALUE_SIZE); > > } > > } > > > > int main(int argc, char *argv[]) { > > FILE *fp; > > int i; > > int error; > > int count_of_servers; > > ssize_t read; > > size_t len = 0; > > char *line = NULL; > > char **tokens = NULL; > > > > if (argc != 3) { > > /* > > * We do not validate or error check any of the arguments > > * Please enter correct arguments > > */ > > printf("Usage: ./server <serverid#> > </path/to/server/conf/file>"); > > exit(1); > > } > > > > server_id = atoi(argv[1]) - 1; > > if ((server_id < 0) || (server_id > MAX_NO_OF_SERVERS)) { > > printf("Incorrect server id provided\n"); > > exit(1); > > } > > > > fp = fopen(argv[2], "r"); > > if (fp == NULL) { > > perror("Could not open server configuration file"); > > exit(1); > > } > > > > /* > > * We now extract the IP and port information of 8 servers > > * which will be involved in this setup. > > */ > > count_of_servers = 0; > > while ((read = getline(&line, &len, fp)) != -1) { > > > > if (count_of_servers == MAX_NO_OF_SERVERS) > > break; > > > > tokens = str_split(line, ' '); > > if (tokens) { > > servers[count_of_servers].serverip = *(tokens); > > servers[count_of_servers].serverport = *(tokens + > 1); > > } > > free(line); > > line = NULL; > > > > count_of_servers++; > > } > > > > fclose(fp); > > > > if (pthread_rwlock_init(&ht_lock, NULL) != 0) { > > perror("Lock init failed"); > > goto free_tokens; > > } > > > > /* > > * Start the server. We start the server in another thread so > > * the libevent event loop for dispatching events on connections > > * can work separately which would otherwise block. > > */ > > error = pthread_create(&server_thread, NULL, &runServer, > &servers[server_id]); > > if (error != 0) { > > perror("Error in server thread creation"); > > goto free_tokens; > > } > > > > peer_timeout.tv_sec = 0; > > peer_timeout.tv_usec = PEER_CONN_TIMEOUT; > > > > input_process(); > > > > for (i = count_of_servers - 1; i >= 0; --i) { > > free(servers[i].serverip); > > free(servers[i].serverport); > > } > > > > pthread_rwlock_destroy(&ht_lock); > > > > return 0; > > > > free_tokens: > > for (i = count_of_servers - 1; i >= 0; --i) { > > free(servers[i].serverip); > > free(servers[i].serverport); > > } > > > > return -1; > > } > > > > > -- > Respectfully > Azat Khuzhin > *********************************************************************** > To unsubscribe, send an e-mail to [email protected] with > unsubscribe libevent-users in the body. >
