also DHT, a real DHT, is never simple; but I figure you are talking about a simple consistent hash?
On Tue, Oct 6, 2015 at 2:42 PM Mark Ellzey <[email protected]> wrote: > A good resource for the "proper way" to use libevent is to read the > libevent book. http://www.wangafu.net/~nickm/libevent-book/ > > On Tue, Oct 6, 2015 at 2:51 AM Azat Khuzhin <[email protected]> wrote: > >> On Mon, Oct 05, 2015 at 10:43:19PM +0530, Sanchayan Maity wrote: >> > Hello, >> > >> > I am writing a simple DHT application and using libevent in my >> > application. The libevent version is 2.0 which is installed by >> > default on my Arch Linux setup. >> > >> > After some searching I found an example to use libevent in a >> > multi-threaded setup. >> > >> http://roncemer.com/software-development/multi-threaded-libevent-server-example/#comment-3249 >> > >> > The above code runs one event base in its own separate thread and >> > uses a threaded workqueue model to service requests on incoming >> > connections. >> >> Hi, >> >> Personally I prefer to use pipe/socketpair for scheduling jobs to >> another threads, in a nutshell it works like this: >> - acceptcb: >> write(notify_fd_write_side, user-struct, sizeof(user-struct)) >> - per thread event base readcb for notify_fd read side: >> read(notify_fd_read_size, buf, sizeof(user-struct)) >> >> It is much simpler then using pthread_cond_*, but both of them must >> works at first glance. >> >> Here is a few implementations using this mechanism: >> - >> https://github.com/ellzey/libevhtp/blob/libevhtp2/src/evhtp2/evhtp_thr.c#L60 >> - >> https://github.com/azat/boostcache/blob/libevent-aio-v2/src/kernel/net/commandserver.cpp#L206 >> >> Other comments are inlined in. >> >> > I have taken that sample code and modified it for my own purposes. >> > While my application is functional when I deploy eight of them in >> > my setup and then check for the basic functionality, while trying >> > to run multiple operations from multiple application nodes I get >> > some messages which I do not fully comprehend at the moment. >> > >> > They are as follows: >> > 1. Epoll %s(%d) on fd %d failed. Old events were %d; read change was >> %d (%s); write change was %d (%s); close change was %d (%s) >> > >> > Which seems to be printed from epoll.c by event warn mechanism >> >> Yep. >> >> > >> > 2. Too many open files >> >> This can be because of too much files (and indeed you have overhead -- >> since you have own base for every socket, and it means that you will >> have epollfd for every socket which is not how this must works -- see >> commends to you sample). >> >> And I guess that this is the main problem that you have, other (with >> epoll failed message) will be fixed when you fix EMFILE problem. >> >> But if you still sure that all fds that you have are needful you can >> increase limits: >> >> $ ulimit -n $((1<<20)) >> Or permanently in /etc/security/limits.conf. >> >> But that limits are limited with fs.file-max sysctl, so if you need more >> increase it. >> >> > 3. One more which I failed to note and was related to EPOLL MOD >> > >> > I know perhaps this is too much to ask, but can someone have a >> > look at the code and tell me things I am doing wrong. I have gone >> > through the libevent documentation online but I am not sure if I >> > am doing things the right way at the moment. >> > >> > The peer part of dht I try to make synchronous by trying to break >> > the event base loop with event loopbreak call in the read cb however >> > sometimes if the callback never gets called which happens, the peer >> > part of the DHT is just stuck, so to circumvent this I use a timeout. >> > >> > The code is attached. I have tried to keep it clean. I am no networking >> > or socket expert, appreciate any comments or feedback. >> > >> > Thanks & Regards, >> > Sanchayan Maity. >> > >> > >> > >> >> > /* >> > * Copyright (C) 2015 Sanchayan Maity <[email protected]> >> > * >> > * Author: Sanchayan Maity <[email protected]> >> > * <[email protected]> >> > * >> > * This program is free software; you can redistribute it and/or modify >> > * it under the terms of the GNU General Public License version 2 and >> > * only version 2 as published by the Free Software Foundation. >> > * >> > * This program is distributed in the hope that it will be useful, >> > * but WITHOUT ANY WARRANTY; without even the implied warranty of >> > * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the >> > * GNU General Public License for more details. >> > */ >> > #include "common.h" >> > >> > /* >> > * Struct to carry around connection (client)-specific data. >> > */ >> > typedef struct client { >> > /* The client's socket. */ >> > int fd; >> > >> > /* The event_base for this client. */ >> > struct event_base *evbase; >> > >> > /* The bufferedevent for this client. */ >> > struct bufferevent *buf_ev; >> > >> > /* The output buffer for this client. */ >> > struct evbuffer *output_buffer; >> > >> > /* Here you can add your own application-specific attributes which >> > * are connection-specific. */ >> > } client_t; >> > >> > /* Table entry */ >> > struct node_t { >> > /* Key */ >> > char *key; >> > /* Value of the key */ >> > char *value; >> > /* Next entry in chain */ >> > struct node_t *next; >> > }; >> > >> > static struct node_t *hashtable[SERVER_HASH_TABLE_SIZE]; >> > >> > /* >> > * We use a read write lock to protect against concurrent >> > * write to the hash table. It is ok to have concurrent >> > * readers. We do not use a mutex as that will reduce >> > * reader concurrency to a single thread at a time. >> > */ >> > pthread_rwlock_t ht_lock; >> > >> > static struct event_base *evbase_accept; >> > /* Workqueue for the server */ >> > static workqueue_t workqueue; >> > >> > /* >> > * Id of server. We use this to pick up appropriate >> > * IP/port parameters from the file. >> > */ >> > static int server_id; >> > >> > struct server_p { >> > /* IP to which server will bind to */ >> > char *serverip; >> > /* Port on which server will listen */ >> > char *serverport; >> > }; >> > >> > /* >> > * We use this to store server parameters of the eight >> > * servers information read from file >> > */ >> > static struct server_p servers[MAX_NO_OF_SERVERS]; >> > >> > pthread_t server_thread; >> > static volatile bool perf_test_on = false; >> > >> > /* Signal handler function (defined below). */ >> > static void sighandler(int signal); >> > >> > /* >> > * Struct to carry around server connection specific data >> > */ >> > struct server_conn { >> > /* Event base for the peer server connection */ >> > struct event_base *evbase; >> > /* Buffer event for the peer server connection */ >> > struct bufferevent *bev; >> > /* Output buffer for the peer server connection */ >> > struct evbuffer *output_buffer; >> > }; >> > >> > static struct server_conn sconn; >> > >> > struct timeval peer_timeout; >> > >> > static void closeClient(client_t *client) { >> > if (client != NULL) { >> > if (client->fd >= 0) { >> > close(client->fd); >> > client->fd = -1; >> > } >> > } >> > } >> > >> > static void closeAndFreeClient(client_t *client) { >> > if (client != NULL) { >> > closeClient(client); >> > if (client->buf_ev != NULL) { >> > bufferevent_free(client->buf_ev); >> > client->buf_ev = NULL; >> > } >> > if (client->evbase != NULL) { >> > event_base_free(client->evbase); >> > client->evbase = NULL; >> > } >> > if (client->output_buffer != NULL) { >> > evbuffer_free(client->output_buffer); >> > client->output_buffer = NULL; >> > } >> > free(client); >> > } >> > } >> > >> > /* >> > * https://en.wikipedia.org/wiki/Jenkins_hash_function >> > */ >> > unsigned int jenkins_one_at_a_time_hash(const char *key, size_t len) { >> > unsigned int hash, i; >> > >> > for(hash = i = 0; i < len; ++i) >> > { >> > hash += key[i]; >> > hash += (hash << 10); >> > hash ^= (hash >> 6); >> > } >> > >> > hash += (hash << 3); >> > hash ^= (hash >> 11); >> > hash += (hash << 15); >> > >> > return hash; >> > } >> > >> > /* >> > * Hash function. Use the above and restrict result as per the table >> > * size. We use a non power of 2 to get good hashing. Refer CLRS. >> > */ >> > unsigned int hash_server(const char *key) { >> > return jenkins_one_at_a_time_hash(key, KEY_SIZE) % >> SERVER_HASH_TABLE_SIZE; >> > } >> > >> > /* >> > * Hash function. Use the above and restrict result as per the table >> > * size. We use a non power of 2 to get good hashing. Refer CLRS. >> > */ >> > unsigned int hash_peer(const char *key) { >> > return jenkins_one_at_a_time_hash(key, KEY_SIZE) % >> PEER_HASH_TABLE_SIZE; >> > } >> > >> > /* >> > * Get the node data pointer as per the key >> > */ >> > struct node_t *hash_table_get(const char *key) { >> > struct node_t *np; >> > >> > pthread_rwlock_rdlock(&ht_lock); >> > for(np = hashtable[hash_server(key)]; np != NULL; np = np->next) { >> > if (strncmp(key, np->key, KEY_SIZE) == 0) { >> > pthread_rwlock_unlock(&ht_lock); >> > /* We found the key */ >> > return np; >> > } >> > } >> > >> > pthread_rwlock_unlock(&ht_lock); >> > return NULL; >> > } >> > >> > /* >> > * We determine if the key being added exists. If it does, the >> > * new value supersedes the old one, else we create a new entry >> > * and add the key/value pair. Return NULL on any error. >> > */ >> > struct node_t *hash_table_put(const char *key, const char *value) { >> > unsigned int hashval; >> > struct node_t *np; >> > >> > pthread_rwlock_wrlock(&ht_lock); >> > if ((np = hash_table_get(key)) == NULL) { /* Not found */ >> > np = (struct node_t *)malloc(sizeof(*np)); >> > if (np == NULL || (np->key = strndup(key, KEY_SIZE)) == >> NULL) >> > goto error; >> > >> > /* Find the bucket position and add at 'head' location */ >> > hashval = hash_server(key); >> > np->next = hashtable[hashval]; >> > hashtable[hashval] = np; >> > } else /* Already there */ >> > free((void *) np->value); /* Free previous value */ >> > if ((np->value = strndup(value, VALUE_SIZE)) == NULL) >> > goto error; >> > >> > return np; >> > >> > error: >> > pthread_rwlock_unlock(&ht_lock); >> > return NULL; >> > } >> > >> > /* >> > * Return 0 on success and 1 on failure >> > */ >> > unsigned int hash_table_delete(const char *key) { >> > struct node_t *np1, *np2; >> > unsigned int hashval; >> > >> > hashval = hash_server(key); >> > >> > pthread_rwlock_wrlock(&ht_lock); >> > for (np1 = hashtable[hashval], np2 = NULL; np1 != NULL; np2 = >> np1, np1 = np1->next) >> > if (strncmp(key, np1->key, KEY_SIZE) == 0) { >> > /* Found a match */ >> > free(np1->key); >> > free(np1->value); >> > if (np2 == NULL) >> > /* At the beginning? */ >> > hashtable[hashval] = np1->next; >> > else >> > /* In the middle or at the end? */ >> > np2->next = np1->next; >> > free(np1); >> > >> > pthread_rwlock_unlock(&ht_lock); >> > return 0; >> > } >> > >> > pthread_rwlock_unlock(&ht_lock); >> > return 1; >> > } >> > >> > /* >> > * Called by libevent when there is data to read. >> > */ >> > void server_buffered_on_read(struct bufferevent *bev, void *arg) { >> > client_t *client = (client_t *)arg; >> > char data[MESSAGE_SIZE] = {0}; >> > struct node_t *np; >> > >> > struct evbuffer *input; >> > input = bufferevent_get_input(bev); >> > /* >> > * Remove a chunk of data from the input buffer, copying it into >> our >> > * local array (data). >> > */ >> > evbuffer_remove(input, data, MESSAGE_SIZE); >> > >> > /* >> > * Check if the message is meant for us and the command. While >> sending >> > * data back, we use the same buffer keeping header and key >> information >> > * but changing/appending the value of the key and setting the OK >> or ERROR command >> > */ >> > if (data[0] == 'C' && data[1] == 'S') { >> > switch (data[2]) { >> > case CMD_PUT: >> > if ((hash_table_put(&data[4], &data[24])) == NULL) >> > data[3] = CMD_ERR; >> > else >> > data[3] = CMD_OK; >> > evbuffer_add(client->output_buffer, data, >> MESSAGE_SIZE); >> > break; >> > case CMD_GET: >> > np = hash_table_get(&data[4]); >> > if (np == NULL) { >> > data[3] = CMD_ERR; >> > evbuffer_add(client->output_buffer, data, >> MESSAGE_SIZE); >> > } else { >> > data[3] = CMD_OK; >> > strncpy(&data[24], np->value, VALUE_SIZE); >> > evbuffer_add(client->output_buffer, data, >> MESSAGE_SIZE); >> > } >> > break; >> > case CMD_DEL: >> > if (hash_table_delete(&data[4])) >> > data[3] = CMD_ERR; >> > else >> > data[3] = CMD_OK; >> > evbuffer_add(client->output_buffer, data, >> MESSAGE_SIZE); >> > break; >> > default: >> > break; >> > } >> > } >> > >> > /* Send the results to the peer. This actually only queues the >> results >> > * for sending. Sending will occur asynchronously, handled by >> libevent. */ >> > if (bufferevent_write_buffer(bev, client->output_buffer)) { >> > errorOut("Error sending data to client on fd %d\n", client->fd); >> > closeClient(client); >> > } >> > } >> > >> > /** >> > * Called by libevent when the write buffer reaches 0. We only >> > * provide this because libevent expects it, but we don't use it. >> > */ >> > void server_buffered_on_write(struct bufferevent *bev, void *arg) { >> > } >> > >> > /** >> > * Called by libevent when there is an error on underlying the socket >> > * descriptor. >> > */ >> > void server_buffered_on_error(struct bufferevent *bev, short events, >> void *arg) { >> > if (events & BEV_EVENT_ERROR) { >> > printf("Server: Got an error: %s\n", >> > >> evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); >> > } >> > closeClient((client_t *)arg); >> > } >> > >> > static void server_job_function(struct job *job) { >> > client_t *client = (client_t *)job->user_data; >> > >> > event_base_dispatch(client->evbase); >> > closeAndFreeClient(client); >> > free(job); >> > } >> > >> > /* >> > * This function will be called by libevent when there is a connection >> > * ready to be accepted. >> > */ >> > void server_on_accept(evutil_socket_t fd, short ev, void *arg) { >> > int client_fd; >> > struct sockaddr_in client_addr; >> > socklen_t client_len = sizeof(client_addr); >> > workqueue_t *workqueue = (workqueue_t *)arg; >> > client_t *client; >> > job_t *job; >> > >> > client_fd = accept(fd, (struct sockaddr *)&client_addr, >> &client_len); >> > if (client_fd < 0) { >> > warn("accept failed"); >> > return; >> > } >> > >> > /* Set the client socket to non-blocking mode. */ >> > if (evutil_make_socket_nonblocking(client_fd) < 0) { >> > warn("failed to set client socket to non-blocking"); >> > close(client_fd); >> > return; >> > } >> > >> > /* Create a client object. */ >> > if ((client = (client_t *)malloc(sizeof(*client))) == NULL) { >> > warn("failed to allocate memory for client state"); >> > close(client_fd); >> > return; >> > } >> > memset(client, 0, sizeof(*client)); >> > client->fd = client_fd; >> > >> > if ((client->output_buffer = evbuffer_new()) == NULL) { >> > warn("client output buffer allocation failed"); >> > closeAndFreeClient(client); >> > return; >> > } >> > >> > if ((client->evbase = event_base_new()) == NULL) { >> > warn("client event_base creation failed"); >> > closeAndFreeClient(client); >> > return; >> > } >> >> Are you sure you need this overhead (i.e. base per socket)? >> AFAICS you don't need this. >> >> > >> > client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, >> > BEV_OPT_CLOSE_ON_FREE | >> BEV_OPT_DEFER_CALLBACKS); >> > if ((client->buf_ev) == NULL) { >> > warn("client bufferevent creation failed"); >> > closeAndFreeClient(client); >> > return; >> > } >> > >> > /* >> > * We trigger the read callback only when there atleast >> MESSAGE_SIZE >> > * bytes to be read. >> > */ >> > bufferevent_setwatermark(client->buf_ev, EV_READ, MESSAGE_SIZE, >> 0); >> > bufferevent_setcb(client->buf_ev, server_buffered_on_read, >> server_buffered_on_write, >> > server_buffered_on_error, client); >> > >> > /* We have to enable it before our callbacks will be >> > * called. */ >> > bufferevent_enable(client->buf_ev, EV_READ); >> > >> > /* Create a job object and add it to the work queue. */ >> > if ((job = (job_t *)malloc(sizeof(*job))) == NULL) { >> > warn("failed to allocate memory for job state"); >> > closeAndFreeClient(client); >> > return; >> > } >> > job->job_function = server_job_function; >> > job->user_data = client; >> > >> > workqueue_add_job(workqueue, job); >> > } >> > >> > /* >> > * Run the server. This function blocks, only returning when the >> server has >> > * terminated. >> > */ >> > void *runServer(void *arg) { >> > struct server_p *server_i = (struct server_p *)arg; >> > evutil_socket_t listenfd; >> > struct sockaddr_in listen_addr; >> > struct event *ev_accept; >> > int reuseaddr_on; >> > struct sigaction siginfo; >> > >> > /* Set signal handlers */ >> > sigset_t sigset; >> > sigemptyset(&sigset); >> > siginfo.sa_handler = sighandler; >> > siginfo.sa_mask = sigset; >> > siginfo.sa_flags = SA_RESTART; >> > >> > sigaction(SIGINT, &siginfo, NULL); >> > sigaction(SIGTERM, &siginfo, NULL); >> > >> > /* Create our listening socket */ >> > listenfd = socket(AF_INET, SOCK_STREAM, 0); >> > if (listenfd < 0) { >> > err(1, "listen failed"); >> > goto exit; >> > } >> > >> > memset(&listen_addr, 0, sizeof(listen_addr)); >> > listen_addr.sin_family = AF_INET; >> > listen_addr.sin_port = htons(atoi(server_i->serverport)); >> > inet_pton(AF_INET, server_i->serverip, &listen_addr.sin_addr); >> > >> > if (bind(listenfd, (struct sockaddr *)&listen_addr, >> sizeof(listen_addr)) < 0) { >> > err(1, "bind failed"); >> > goto exit; >> > } >> > >> > if (listen(listenfd, CONNECTION_BACKLOG) < 0) { >> > err(1, "listen failed"); >> > goto exit; >> > } >> > reuseaddr_on = 1; >> > setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, >> > sizeof(reuseaddr_on)); >> > >> > /* >> > * Set the socket to non-blocking, this is essential in event >> > * based programming with libevent. >> > */ >> > if (evutil_make_socket_nonblocking(listenfd) < 0) { >> > err(1, "failed to set server socket to non-blocking"); >> > goto exit; >> > } >> > >> > if ((evbase_accept = event_base_new()) == NULL) { >> > perror("Unable to create socket accept event base"); >> > close(listenfd); >> > goto exit; >> > } >> > >> > /* Initialize work queue. */ >> > if (workqueue_init(&workqueue, NUM_THREADS)) { >> > perror("Failed to create work queue"); >> > close(listenfd); >> > workqueue_shutdown(&workqueue); >> > goto exit; >> > } >> > >> > /* We now have a listening socket, we create a read event to >> > * be notified when a client connects. */ >> > ev_accept = event_new(evbase_accept, listenfd, EV_READ | EV_PERSIST, >> > server_on_accept, (void >> *)&workqueue); >> > event_add(ev_accept, NULL); >> > >> > printf("Server running.\n"); >> > >> > /* Start the event loop. */ >> > event_base_dispatch(evbase_accept); >> > >> > event_base_free(evbase_accept); >> > evbase_accept = NULL; >> > >> > close(listenfd); >> > >> > printf("Server shutdown.\n"); >> > >> > exit: >> > pthread_exit(NULL); >> > } >> > >> > /* >> > * Kill the server. This function can be called from another thread to >> kill >> > * the server, causing runServer() to return. >> > */ >> > void killServer(void) { >> > fprintf(stdout, "Stopping socket listener event loop.\n"); >> > if (event_base_loopexit(evbase_accept, NULL)) { >> > perror("Error shutting down server"); >> > } >> > fprintf(stdout, "Stopping workers.\n"); >> > workqueue_shutdown(&workqueue); >> > } >> > >> > static void sighandler(int signal) { >> > fprintf(stdout, "Received signal %d: %s. Shutting down.\n", signal, >> > strsignal(signal)); >> > killServer(); >> > } >> > >> > /* >> > * Taken from >> http://stackoverflow.com/questions/9210528/split-string-with-delimiters-in-c >> > * We modify it to use strtok_r the MT safe variant. strtok is not MT >> safe. >> > */ >> > char** str_split(char* a_str, const char a_delim) { >> > char** result = 0; >> > size_t count = 0; >> > char* tmp = a_str; >> > char* last_comma = 0; >> > char* save = 0; >> > char delim[2]; >> > delim[0] = a_delim; >> > delim[1] = 0; >> > >> > /* Count how many elements will be extracted. */ >> > while (*tmp) { >> > if (a_delim == *tmp) { >> > count++; >> > last_comma = tmp; >> > } >> > tmp++; >> > } >> > >> > /* Add space for trailing token. */ >> > count += last_comma < (a_str + strlen(a_str) - 1); >> > >> > /* Add space for terminating null string so caller >> > knows where the list of returned strings ends. */ >> > count++; >> > >> > result = (char **)malloc(sizeof(char*) * count); >> > >> > if (result) { >> > size_t idx = 0; >> > //char* token = strtok(a_str, delim); >> > char* token = strtok_r(a_str, delim, &save); >> > >> > while (token) { >> > *(result + idx++) = strdup(token); >> > //token = strtok(0, delim); >> > token = strtok_r(0, delim, &save); >> > } >> > assert(idx == count - 1); >> > *(result + idx) = 0; >> > } >> > >> > return result; >> > } >> > >> > static void print_key(char *data, int start_pos) { >> > int i; >> > >> > for (i = start_pos; i < start_pos + KEY_SIZE; i++) >> > printf("%c", data[i]); >> > } >> > >> > static void closeAndFreeServerConn(struct server_conn *client) { >> > if (client != NULL) { >> > if (client->bev != NULL) { >> > bufferevent_free(client->bev); >> > client->bev = NULL; >> > } >> > if (client->evbase != NULL) { >> > event_base_free(client->evbase); >> > client->evbase = NULL; >> > } >> > if (client->output_buffer != NULL) { >> > evbuffer_free(client->output_buffer); >> > client->output_buffer = NULL; >> > } >> > } >> > } >> > >> > /* >> > * Called by libevent when there is data to read. >> > */ >> > void peer_buffered_on_read(struct bufferevent *bev, void *arg) { >> > struct server_conn *sconn = (struct server_conn *)arg; >> > char data[MESSAGE_SIZE] = {0}; >> > >> > struct evbuffer *input; >> > input = bufferevent_get_input(bev); >> > >> > /* >> > * Remove a chunk of data from the input buffer, copying it into >> our >> > * local array (data). >> > */ >> > evbuffer_remove(input, data, MESSAGE_SIZE); >> > >> > /* >> > * Reply from server >> > */ >> > if (data[0] == 'C' && data[1] == 'S') { >> > switch (data[2]) { >> > case CMD_PUT: >> > if (data[3] == CMD_OK) { >> > if (!perf_test_on) >> > printf("\nPut operation >> successful\n"); >> > } else { >> > if (!perf_test_on) >> > printf("\nPut operation >> failed\n"); >> > } >> > >> > if (!perf_test_on) { >> > printf("Key was: "); >> > print_key(data, 4); >> > printf("\n"); >> > printf("Value was: %s\n\n", &data[24]); >> > } >> > break; >> > case CMD_GET: >> > if (data[3] == CMD_OK) { >> > if (!perf_test_on) { >> > printf("\nGet operation >> successful\n"); >> > printf("Key was: "); >> > print_key(data, 4); >> > printf("\n"); >> > printf("Value is: %s\n\n", >> &data[24]); >> > } >> > } else { >> > if (!perf_test_on) { >> > printf("\nGet operation >> failed\n"); >> > printf("Key was: "); >> > print_key(data, 4); >> > printf("\n\n"); >> > } >> > } >> > break; >> > case CMD_DEL: >> > if (data[3] == CMD_OK) { >> > if (!perf_test_on) >> > printf("\nDelete operation >> successful\n"); >> > } >> > else { >> > if (!perf_test_on) >> > printf("\nDelete operation >> failed\n"); >> > } >> > >> > if (!perf_test_on) { >> > printf("Key was: "); >> > print_key(data, 4); >> > printf("\n\n"); >> > } >> > break; >> > default: >> > break; >> > } >> > } >> > >> > if (sconn->evbase != NULL) >> > event_base_loopbreak(sconn->evbase); >> > } >> > >> > /* >> > * Called by libevent when there is an error on underlying the socket >> > * descriptor. >> > */ >> > void peer_buffered_on_error(struct bufferevent *bev, short events, void >> *arg) { >> > if (events & BEV_EVENT_ERROR) { >> > printf("Peer: Got an error: %s\n", >> > >> evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); >> > } >> > } >> > >> > /** >> > * Called by libevent when the write buffer reaches 0. We only >> > * provide this because libevent expects it, but we don't use it. >> > */ >> > void peer_buffered_on_write(struct bufferevent *bev, void *arg) { >> > } >> > >> > static int make_server_connection(int lserver_id) { >> > struct sockaddr_in sin; >> > >> > memset(&sin, 0, sizeof(sin)); >> > sin.sin_family = AF_INET; >> > sin.sin_port = htons(atoi(servers[lserver_id].serverport)); >> > inet_pton(AF_INET, servers[lserver_id].serverip, &sin.sin_addr); >> > >> > if ((sconn.output_buffer = evbuffer_new()) == NULL) { >> > closeAndFreeServerConn(&sconn); >> > return -1; >> > } >> > >> > if ((sconn.evbase = event_base_new()) == NULL) { >> > closeAndFreeServerConn(&sconn); >> > return -1; >> > } >> > >> > if ((sconn.bev = bufferevent_socket_new(sconn.evbase, >> > -1, BEV_OPT_CLOSE_ON_FREE | >> BEV_OPT_DEFER_CALLBACKS)) == NULL) { >> > closeAndFreeServerConn(&sconn); >> > return -1; >> > } >> > >> > /* >> > * We trigger the read callback only when there atleast >> MESSAGE_SIZE >> > * bytes to be read. >> > */ >> > bufferevent_setwatermark(sconn.bev, EV_READ | EV_WRITE, >> MESSAGE_SIZE, 0); >> > bufferevent_setcb(sconn.bev, peer_buffered_on_read, >> > peer_buffered_on_write, >> peer_buffered_on_error, &sconn); >> > bufferevent_enable(sconn.bev, EV_READ); >> > >> > if (bufferevent_socket_connect(sconn.bev, >> > (struct sockaddr *)&sin, sizeof(sin)) < 0) { >> > closeAndFreeServerConn(&sconn); >> > return -1; >> > } >> > >> > return 0; >> > } >> > >> > void put_at_server(const char *key, const char *value) { >> > char data[MESSAGE_SIZE]; >> > int lserver_id; >> > >> > memset(data, 0x30, MESSAGE_SIZE); >> > data[0] = 'C'; >> > data[1] = 'S'; >> > data[2] = CMD_PUT; >> > >> > strncpy(&data[4], key, KEY_SIZE); >> > strncpy(&data[24], value, VALUE_SIZE); >> > >> > lserver_id = hash_peer(&data[4]); >> > if (!perf_test_on) >> > printf("PUT Server Id: %d\n", lserver_id); >> > >> > if (make_server_connection(lserver_id) != 0) { >> > printf("Connecting to Server %d failed.\n", lserver_id); >> > return; >> > } >> > >> > evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE); >> > bufferevent_write_buffer(sconn.bev, sconn.output_buffer); >> > event_base_loopexit(sconn.evbase, &peer_timeout); >> > event_base_loop(sconn.evbase, 0); >> > >> > /* >> > * Though our functional specifications are to maintain the >> connections >> > * in an open state once we conenct, but for some reason if the >> connections >> > * are not closed, the buffers do not seem to flush and we never >> exit the >> > * event loop above OR the server does not exit the event loop >> connection >> > * even though there was only one event from the event base to >> handle and >> > * the other end does not get the data or the callbacks get >> called. So we >> > * explicitly close the connection here. Did try a few different >> mechanisms >> > * for handling this scenario both at server and peer end but >> none seems to >> > * work. So we explicitly deviate from our design recommendation >> of keeping >> > * connections open unfortunately. Needs some further indepth >> investigation. >> > */ >> > if (sconn.bev != NULL) { >> > bufferevent_free(sconn.bev); >> > sconn.bev = NULL; >> > } >> > if (sconn.evbase != NULL) { >> > event_base_free(sconn.evbase); >> > sconn.evbase = NULL; >> > } >> > if (sconn.output_buffer != NULL) { >> > evbuffer_free(sconn.output_buffer); >> > sconn.output_buffer = NULL; >> > } >> > } >> > >> > void get_from_server(const char *key) { >> > char data[MESSAGE_SIZE]; >> > int lserver_id; >> > >> > memset(data, 0x30, MESSAGE_SIZE); >> > data[0] = 'C'; >> > data[1] = 'S'; >> > data[2] = CMD_GET; >> > >> > strncpy(&data[4], key, KEY_SIZE); >> > >> > lserver_id = hash_peer(&data[4]); >> > if (!perf_test_on) >> > printf("GET Server Id: %d\n", lserver_id); >> > >> > if (make_server_connection(lserver_id) != 0) { >> > printf("Connecting to Server %d failed.\n", lserver_id); >> > return; >> > } >> > >> > evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE); >> > bufferevent_write_buffer(sconn.bev, sconn.output_buffer); >> > event_base_loopexit(sconn.evbase, &peer_timeout); >> > event_base_loop(sconn.evbase, 0); >> > >> > /* >> > * Though our functional specifications are to maintain the >> connections >> > * in an open state once we conenct, but for some reason if the >> connections >> > * are not closed, the buffers do not seem to flush and we never >> exit the >> > * event loop above OR the server does not exit the event loop >> connection >> > * even though there was only one event from the event base to >> handle and >> > * the other end does not get the data or the callbacks get >> called. So we >> > * explicitly close the connection here. Did try a few different >> mechanisms >> > * for handling this scenario both at server and peer end but >> none seems to >> > * work. So we explicitly deviate from our design recommendation >> of keeping >> > * connections open unfortunately. Needs some further indepth >> investigation. >> > */ >> > if (sconn.bev != NULL) { >> > bufferevent_free(sconn.bev); >> > sconn.bev = NULL; >> > } >> > if (sconn.evbase != NULL) { >> > event_base_free(sconn.evbase); >> > sconn.evbase = NULL; >> > } >> > if (sconn.output_buffer != NULL) { >> > evbuffer_free(sconn.output_buffer); >> > sconn.output_buffer = NULL; >> > } >> > } >> > >> > void delete_from_server(const char *key) { >> > char data[MESSAGE_SIZE]; >> > int lserver_id; >> > >> > memset(data, 0x30, MESSAGE_SIZE); >> > data[0] = 'C'; >> > data[1] = 'S'; >> > data[2] = CMD_DEL; >> > >> > strncpy(&data[4], key, KEY_SIZE); >> > >> > lserver_id = hash_peer(&data[4]); >> > if (!perf_test_on) >> > printf("DEL Server Id: %d\n", lserver_id); >> > >> > if (make_server_connection(lserver_id) != 0) { >> > printf("Connecting to Server %d failed.\n", lserver_id); >> > return; >> > } >> > >> > evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE); >> > bufferevent_write_buffer(sconn.bev, sconn.output_buffer); >> > event_base_loopexit(sconn.evbase, &peer_timeout); >> > event_base_loop(sconn.evbase, 0); >> > >> > /* >> > * Though our functional specifications are to maintain the >> connections >> > * in an open state once we conenct, but for some reason if the >> connections >> > * are not closed, the buffers do not seem to flush and we never >> exit the >> > * event loop above OR the server does not exit the event loop >> connection >> > * even though there was only one event from the event base to >> handle and >> > * the other end does not get the data or the callbacks get >> called. So we >> > * explicitly close the connection here. Did try a few different >> mechanisms >> > * for handling this scenario both at server and peer end but >> none seems to >> > * work. So we explicitly deviate from our design recommendation >> of keeping >> > * connections open unfortunately. Needs some further indepth >> investigation. >> > */ >> > if (sconn.bev != NULL) { >> > bufferevent_free(sconn.bev); >> > sconn.bev = NULL; >> > } >> > if (sconn.evbase != NULL) { >> > event_base_free(sconn.evbase); >> > sconn.evbase = NULL; >> > } >> > if (sconn.output_buffer != NULL) { >> > evbuffer_free(sconn.output_buffer); >> > sconn.output_buffer = NULL; >> > } >> > } >> > >> > static void discard_logs(int severity, const char *msg) { >> > >> > } >> > >> > void set_libevent_logging(void) { >> > event_enable_debug_logging(EVENT_DBG_ALL); >> > //event_set_log_callback(discard_logs); >> > } >> > >> > void rand_str(char *dest, size_t length) { >> > char charset[] = "0123456789" >> > >> "abcdefghijklmnopqrstuvwxyz" >> > >> "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; >> > >> > while (length-- > 0) { >> > size_t index = (double) rand() / RAND_MAX * (sizeof >> charset - 1); >> > *dest++ = charset[index]; >> > } >> > >> > *dest = '\0'; >> > } >> > >> > void run_perf_tests(void) { >> > int i; >> > char key[KEY_SIZE]; >> > char value[VALUE_SIZE]; >> > struct timeval t1, t2; >> > double elapsedtime, totalelapsedtime; >> > >> > perf_test_on = true; >> > >> > /* Run PUT tests */ >> > for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) { >> > memset(key, 0x30, KEY_SIZE); >> > memset(value, 0x30, VALUE_SIZE); >> > >> > key[0] = server_id + 0x31; >> > /* Generate a key depending on loop iteration */ >> > snprintf(&key[1], KEY_SIZE - 1, "%d", i); >> > /* Generate a random string value */ >> > rand_str(value, VALUE_SIZE); >> > gettimeofday(&t1, NULL); >> > put_at_server(key, value); >> > gettimeofday(&t2, NULL); >> > // compute and print the elapsed time in millisec >> > elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0; // >> sec to ms >> > elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0; // >> us to ms >> > printf("Elapsed time: %f\n", elapsedtime); >> > totalelapsedtime += elapsedtime; >> > } >> > printf("Average Response time for PUT requests: %f ms\n", >> totalelapsedtime / NO_OF_TEST_ITERATIONS); >> > >> > printf("Press any key to continue\n"); >> > getchar(); >> > >> > /* Run GET tests */ >> > for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) { >> > memset(key, 0x30, KEY_SIZE); >> > >> > key[0] = server_id + 0x31; >> > /* Generate a key depending on loop iteration */ >> > snprintf(&key[1], KEY_SIZE - 1, "%d", i); >> > gettimeofday(&t1, NULL); >> > get_from_server(key); >> > gettimeofday(&t2, NULL); >> > // compute and print the elapsed time in millisec >> > elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0; // >> sec to ms >> > elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0; // >> us to ms >> > printf("Elapsed time: %f\n", elapsedtime); >> > totalelapsedtime += elapsedtime; >> > } >> > printf("Average Response time for GET requests: %f ms\n", >> totalelapsedtime / NO_OF_TEST_ITERATIONS); >> > >> > printf("Press any key to continue\n"); >> > getchar(); >> > >> > /* Run DEL tests */ >> > for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) { >> > memset(key, 0x30, KEY_SIZE); >> > >> > key[0] = server_id + 0x31; >> > /* Generate a key depending on loop iteration */ >> > snprintf(&key[1], KEY_SIZE - 1, "%d", i); >> > gettimeofday(&t1, NULL); >> > delete_from_server(key); >> > gettimeofday(&t2, NULL); >> > // compute and print the elapsed time in millisec >> > elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0; // >> sec to ms >> > elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0; // >> us to ms >> > printf("Elapsed time: %f\n", elapsedtime); >> > totalelapsedtime += elapsedtime; >> > } >> > printf("Average Response time for DEL requests: %f ms\n", >> totalelapsedtime / NO_OF_TEST_ITERATIONS); >> > >> > perf_test_on = false; >> > } >> > >> > void input_process(void) { >> > char key[KEY_SIZE]; >> > char value[VALUE_SIZE]; >> > bool exitloop = false; >> > int input; >> > >> > /* >> > * We run the peer functionality in this main thread >> > */ >> > while (!exitloop) { >> > printf("\nSelect Operation\n"); >> > printf("(1) Put (2) Get (3) Delete (4) Run tests\n"); >> > printf("(5) Exit"); >> > printf("\nPlease enter your selection (1-5):\t"); >> > >> > scanf("%d", &input); >> > getchar(); >> > >> > switch (input) { >> > case 1: >> > printf("Enter Key: \t"); >> > scanf("%s", key); >> > printf("\n"); >> > printf("Enter Value: \t"); >> > scanf("%s", value); >> > printf("\n"); >> > put_at_server(key, value); >> > break; >> > case 2: >> > printf("Enter Key: \t"); >> > scanf("%s", key); >> > printf("\n"); >> > get_from_server(key); >> > break; >> > case 3: >> > printf("Enter Key: \t"); >> > scanf("%s", key); >> > printf("\n"); >> > delete_from_server(key); >> > break; >> > case 4: >> > run_perf_tests(); >> > break; >> > case 5: >> > killServer(); >> > exitloop = true; >> > break; >> > default: >> > printf("\n\nWrong value: %d\n", input); >> > break; >> > } >> > >> > /* Reset buffers for next iteration */ >> > memset(key, 0x30, KEY_SIZE); >> > memset(value, 0x30, VALUE_SIZE); >> > } >> > } >> > >> > int main(int argc, char *argv[]) { >> > FILE *fp; >> > int i; >> > int error; >> > int count_of_servers; >> > ssize_t read; >> > size_t len = 0; >> > char *line = NULL; >> > char **tokens = NULL; >> > >> > if (argc != 3) { >> > /* >> > * We do not validate or error check any of the arguments >> > * Please enter correct arguments >> > */ >> > printf("Usage: ./server <serverid#> >> </path/to/server/conf/file>"); >> > exit(1); >> > } >> > >> > server_id = atoi(argv[1]) - 1; >> > if ((server_id < 0) || (server_id > MAX_NO_OF_SERVERS)) { >> > printf("Incorrect server id provided\n"); >> > exit(1); >> > } >> > >> > fp = fopen(argv[2], "r"); >> > if (fp == NULL) { >> > perror("Could not open server configuration file"); >> > exit(1); >> > } >> > >> > /* >> > * We now extract the IP and port information of 8 servers >> > * which will be involved in this setup. >> > */ >> > count_of_servers = 0; >> > while ((read = getline(&line, &len, fp)) != -1) { >> > >> > if (count_of_servers == MAX_NO_OF_SERVERS) >> > break; >> > >> > tokens = str_split(line, ' '); >> > if (tokens) { >> > servers[count_of_servers].serverip = *(tokens); >> > servers[count_of_servers].serverport = *(tokens + >> 1); >> > } >> > free(line); >> > line = NULL; >> > >> > count_of_servers++; >> > } >> > >> > fclose(fp); >> > >> > if (pthread_rwlock_init(&ht_lock, NULL) != 0) { >> > perror("Lock init failed"); >> > goto free_tokens; >> > } >> > >> > /* >> > * Start the server. We start the server in another thread so >> > * the libevent event loop for dispatching events on connections >> > * can work separately which would otherwise block. >> > */ >> > error = pthread_create(&server_thread, NULL, &runServer, >> &servers[server_id]); >> > if (error != 0) { >> > perror("Error in server thread creation"); >> > goto free_tokens; >> > } >> > >> > peer_timeout.tv_sec = 0; >> > peer_timeout.tv_usec = PEER_CONN_TIMEOUT; >> > >> > input_process(); >> > >> > for (i = count_of_servers - 1; i >= 0; --i) { >> > free(servers[i].serverip); >> > free(servers[i].serverport); >> > } >> > >> > pthread_rwlock_destroy(&ht_lock); >> > >> > return 0; >> > >> > free_tokens: >> > for (i = count_of_servers - 1; i >= 0; --i) { >> > free(servers[i].serverip); >> > free(servers[i].serverport); >> > } >> > >> > return -1; >> > } >> > >> >> >> -- >> Respectfully >> Azat Khuzhin >> *********************************************************************** >> To unsubscribe, send an e-mail to [email protected] with >> unsubscribe libevent-users in the body. >> >
