Author: jmcd Date: 2006-09-06 14:50:52 +0000 (Wed, 06 Sep 2006) New Revision: 18180
WebSVN: http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=18180 Log: >From Aleksey Fedoseev, The patch consists of several modifications: 1) added preallocation of incoming stream buffer (minus 1 malloc/free per incoming message) 2) corrected program exit (cleaning up communication buffer / closing dispatcher's sockets) 3) added message size test Modified: branches/tmp/vl-messaging/source/include/messages.h branches/tmp/vl-messaging/source/lib/messages_socket.c branches/tmp/vl-messaging/source/lib/messages_stream.c branches/tmp/vl-messaging/source/torture/msgtest.c Changeset: Modified: branches/tmp/vl-messaging/source/include/messages.h =================================================================== --- branches/tmp/vl-messaging/source/include/messages.h 2006-09-06 14:29:57 UTC (rev 18179) +++ branches/tmp/vl-messaging/source/include/messages.h 2006-09-06 14:50:52 UTC (rev 18180) @@ -125,6 +125,7 @@ struct message_list *prev, *next; struct message_rec *msg; size_t processed; /* number of read/written bytes */ + size_t allocated; /* number of allocated bytes */ }; #endif Modified: branches/tmp/vl-messaging/source/lib/messages_socket.c =================================================================== --- branches/tmp/vl-messaging/source/lib/messages_socket.c 2006-09-06 14:29:57 UTC (rev 18179) +++ branches/tmp/vl-messaging/source/lib/messages_socket.c 2006-09-06 14:50:52 UTC (rev 18180) @@ -101,9 +101,7 @@ cleanup_messages(wait_send); if (socket_fd >= 0) { - if (mtype == MESSAGING_TYPE_STREAM) { - shutdown_stream_sockets(); - } + shutdown_stream_sockets(); close(socket_fd); socket_fd = -1; } Modified: branches/tmp/vl-messaging/source/lib/messages_stream.c =================================================================== --- branches/tmp/vl-messaging/source/lib/messages_stream.c 2006-09-06 14:29:57 UTC (rev 18179) +++ branches/tmp/vl-messaging/source/lib/messages_stream.c 2006-09-06 14:50:52 UTC (rev 18180) @@ -70,7 +70,7 @@ }; static struct messaging_client *clients_cache = NULL; -static struct message_list *tcp_incoming = NULL; +static struct message_list *disp_incoming = NULL; static int dispatcher_pipe = -1; /* approximate maximum number of connected clients in the list */ @@ -80,6 +80,8 @@ #define MESSAGING_DISPATCHER_SOCKET "dispatcher" #define MESSAGING_DISPATCHER_LOCKFILE "dispatcher.pid" +#define INITIAL_CONTAINER_SIZE 64 + static const char *dispatch_path(void) { static char *name = NULL; @@ -97,6 +99,56 @@ } /**************************************************************************** + Allocate/reallocate message container +****************************************************************************/ + +struct message_list *allocate_container(TALLOC_CTX *mem_ctx, + struct message_list *cnt, + size_t needsize) +{ + uint8_t *buffer; + size_t size = INITIAL_CONTAINER_SIZE; + + while(size < needsize) size *= 2; + + if(cnt == NULL) { + cnt = TALLOC_ZERO_P(mem_ctx, struct message_list); + if(cnt == NULL) { + DEBUG(0, ("talloc failed\n")); + return NULL; + } + buffer = TALLOC_ARRAY(cnt, uint8_t, size); + if(buffer == NULL) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(cnt); + return NULL; + } + } else { + + SMB_ASSERT(size > cnt->allocated); + + buffer = TALLOC_REALLOC_ARRAY(cnt, cnt->msg, uint8_t, size); + if(buffer == NULL) { + DEBUG(0, ("realloc failed\n")); + TALLOC_FREE(cnt->msg); + /* try to allocate with talloc */ + buffer = TALLOC_ARRAY(cnt, uint8_t, size); + if(buffer == NULL) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(cnt); + return NULL; + } + } + } + + cnt->msg = (struct message_rec*)buffer; + cnt->processed = (size_t)-1; + cnt->allocated = size; + + return cnt; +} + +/**************************************************************************** Client's queue helper functions ****************************************************************************/ @@ -145,7 +197,8 @@ for (client = clients_cache; client != NULL; client = client->next) { clients_count++; - if(client->outgoing == NULL && client->incoming == NULL) { + if(client->outgoing == NULL && + client->incoming->processed == (size_t)-1) { last_free = client; } } @@ -194,15 +247,24 @@ drop_old_client(); } + c->next = c->prev = NULL; + /* set invalid pid cause we don't know yet who is the peer */ c->pid = procid_self(); c->pid.pid = -1; c->fd = fd; - c->incoming = NULL; + + /* be sure that we'll close socket */ + talloc_set_destructor(c, messaging_client_destr); + + c->incoming = allocate_container(c, NULL, 0); + if (c->incoming == NULL) { + TALLOC_FREE(c); + return ; + } c->outgoing = NULL; c->connected = True; - talloc_set_destructor(c, messaging_client_destr); DEBUG(10, ("Adding new client\n")); @@ -291,12 +353,18 @@ return; } + c->next = c->prev = NULL; + /* set foreign dispatcher pid */ c->pid.ip = addr.sin_addr; c->pid.pid = MESSAGING_DISPATCHER_PID; c->fd = fd; - c->incoming = NULL; + c->incoming = allocate_container(c, NULL, 0); + if(c->incoming == NULL) { + TALLOC_FREE(c); + return ; + } c->outgoing = NULL; c->connected = True; talloc_set_destructor(c, messaging_client_destr); @@ -378,9 +446,17 @@ return NULL; } + result->next = result->prev = NULL; + + result->fd = -1; + result->pid = *pid; result->connected = False; - result->incoming = NULL; + result->incoming = allocate_container(result, NULL, 0); + if(result->incoming == NULL) { + TALLOC_FREE(result); + return NULL; + } result->outgoing = NULL; if(mtype == MESSAGING_TYPE_DISPATCHER) { @@ -489,7 +565,7 @@ static BOOL container_full(const struct message_list *li) { - return ((li != NULL) && (li->processed != 0) && + return ((li != NULL) && (li->processed != (size_t)-1) && (li->processed == li->msg->len)); } @@ -505,19 +581,9 @@ ssize_t nread; uint8_t *target; - if ((cnt != NULL) && container_full(cnt)) { - TALLOC_FREE(cnt); - } - - if (cnt == NULL) { - cnt = TALLOC_ZERO_P(mem_ctx, struct message_list); - if (cnt == NULL) { - DEBUG(0, ("talloc failed\n")); - return NULL; - } - } - - if (cnt->msg == NULL) { + SMB_ASSERT(cnt != NULL); + + if (cnt->processed == (size_t)-1) { /* First, read message length */ target = (uint8_t *)(&len); to_read = sizeof(size_t); @@ -529,38 +595,44 @@ nread = sys_read(fd, target, to_read); if(nread == 0) { DEBUG(10, ("Messaging peer closed\n")); - TALLOC_FREE(cnt); + cnt->processed = (size_t)-1; return NULL; } if (nread < 0) { DEBUG(5, ("Error while reading from socket (errno = %d)\n", errno)); - TALLOC_FREE(cnt); + cnt->processed = (size_t)-1; return NULL; } - cnt->processed += nread; + if(cnt->processed == (size_t)-1) { - if (cnt->msg == NULL) { - if(cnt->processed < sizeof(size_t)) { + if(nread < sizeof(size_t)) { DEBUG(0, ("Received less that %d bytes!\n", sizeof(size_t))); - TALLOC_FREE(cnt); + cnt->processed = (size_t)-1; return NULL; } DEBUG(10, ("Receiving msg of length %d\n", len)); - cnt->msg = (struct message_rec*)TALLOC_ZERO_ARRAY(cnt, char, - len); - if (cnt->msg == NULL) { - DEBUG(0, ("talloc failed\n")); - TALLOC_FREE(cnt); - return NULL; + + if(len > cnt->allocated) { + DEBUG(10, ("Reallocating incoming container" + " to fit size %d\n", len)); + cnt = allocate_container(mem_ctx, cnt, len); + if(cnt == NULL) { + DEBUG(0, ("Reallocating failed!\n")); + return NULL; + } } - cnt->msg->len = len; + + cnt->processed = 0; + cnt->msg->len = len; } + cnt->processed += nread; + return cnt; } @@ -570,23 +642,23 @@ static BOOL receive_one_message(int fd, TALLOC_CTX *mem_ctx, - struct message_list **msg, + struct message_list *msg, BOOL* received) { *received = False; - *msg = read_from_stream_socket(fd, mem_ctx, *msg); - if (*msg == NULL) { + + if (read_from_stream_socket(fd, mem_ctx, msg) == NULL) { DEBUG(5, ("failed to read\n")); return False; } - if (!container_full(*msg)) { + if (!container_full(msg)) { return True; } - SMB_ASSERT((*msg)->msg != NULL); + SMB_ASSERT(msg->msg != NULL); - if ((*msg)->msg->len < sizeof(struct message_rec)) { + if (msg->msg->len < sizeof(struct message_rec)) { DEBUG(5, ("Message too short\n")); return False; } @@ -605,7 +677,7 @@ struct message_rec *msg_rec; BOOL received = False; - if(!receive_one_message(c->fd, c, &c->incoming, &received)) { + if(!receive_one_message(c->fd, c, c->incoming, &received)) { DEBUG(5, ("killing client %s\n", procid_str_static(&c->pid))); return False; } @@ -621,7 +693,8 @@ DEBUG(10, ("Got hello from %s\n", procid_str_static(&msg_rec->src))); c->pid = msg_rec->src; - TALLOC_FREE((c->incoming)); + + c->incoming->processed = (size_t)-1; } } @@ -784,11 +857,7 @@ DLIST_ADD_END((*received), li, tmp); - /* - We don't free client->incoming, because it - will be freed in a next - read_from_stream_socket call. - */ + client->incoming->processed = (size_t)-1; } } @@ -834,6 +903,8 @@ close(dispatcher_pipe); dispatcher_pipe = -1; } + + TALLOC_FREE(disp_incoming); } /**************************************************************************** @@ -1003,6 +1074,11 @@ #ifdef CLUSTER_SUPPORT tcp_fd = open_remote_listener(); + if (tcp_fd < 0) { + DEBUG(0, ("Can't create tcp socket\n")); + close(fd); + exit(1); + } #endif /* CLUSTER_SUPPORT */ ok = True; @@ -1027,6 +1103,11 @@ shutdown_stream_sockets(); + close(fd); +#ifdef CLUSTER_SUPPORT + close(tcp_fd); +#endif + /* Presently we don't remove socket & lock file. Should be fixed later */ @@ -1193,6 +1274,15 @@ return -1; good: + + SMB_ASSERT(disp_incoming == NULL); + disp_incoming = allocate_container(NULL, NULL, 0); + if(disp_incoming == NULL) { + DEBUG(0, ("Can't initialize tcp incoming buffer\n")); + close(sock); + return -1; + } + DEBUG(10, ("Dispatcher connected, say hello.\n")); /* say HELLO to the dispatcher to identify oneself */ message_send_pid_socket(pid_to_procid(MESSAGING_DISPATCHER_PID), @@ -1218,15 +1308,35 @@ void receive_on_socket_dispatcher(int socket_fd, struct message_list **queue) { BOOL received = False; - if(!receive_one_message(socket_fd, NULL, &tcp_incoming, &received)) { + if(!receive_one_message(socket_fd, NULL, disp_incoming, &received)) { DEBUG(0, ("Can't receive a message through the daemon\n")); return ; } if(received) { - struct message_list *tmp; - DLIST_ADD_END((*queue), tcp_incoming, tmp); - /* give memory control to a higher level code */ - tcp_incoming = NULL; + struct message_list *tmp, *li; + + SMB_ASSERT(disp_incoming != NULL && disp_incoming->msg != NULL); + + li = TALLOC_ZERO_P(NULL, struct message_list); + if(li == NULL) { + DEBUG(0, ("talloc failed\n")); + return ; + } + li->msg = (struct message_rec*)TALLOC_ARRAY( + li, uint8_t, + disp_incoming->msg->len); + if(li->msg == NULL) { + DEBUG(0, ("talloc failed\n")); + TALLOC_FREE(li); + return ; + } + + memcpy(li->msg, disp_incoming->msg, + disp_incoming->msg->len); + + DLIST_ADD_END((*queue), li, tmp); + + disp_incoming->processed = (size_t)-1; } } Modified: branches/tmp/vl-messaging/source/torture/msgtest.c =================================================================== --- branches/tmp/vl-messaging/source/torture/msgtest.c 2006-09-06 14:29:57 UTC (rev 18179) +++ branches/tmp/vl-messaging/source/torture/msgtest.c 2006-09-06 14:50:52 UTC (rev 18180) @@ -56,7 +56,10 @@ { pid_t pid; int i, n; - char buf[12], *p, *addr; + char buf[12], *large_buf; +#ifdef CLUSTER_SUPPORT + char *p, *addr; +#endif struct timeval dispatch_timeout = timeval_set(0, DISPATCH_TIMEOUT); struct process_id receiver; @@ -181,6 +184,36 @@ (ping_count+pong_count)/timeval_elapsed(&tv)); } + /* large messages test */ + pong_count = 0; + + { + struct timeval tv = timeval_current(); + size_t timelimit = n; + size_t ping_count = 0; + + printf("Starting large messages test.\n"); + + for(i = 1; i <= (1 << 16) && timeval_elapsed(&tv) < timelimit; i *= 2) { + large_buf = TALLOC_ZERO(NULL, i); + if(large_buf == NULL) { + printf("talloc failed!\n"); + break; + } + + printf("size %d... ", i); + + if(message_send_pid(receiver, MSG_PING, + large_buf, i, False)) ping_count++; + + while (ping_count > pong_count && timeval_elapsed(&tv) < timelimit) { + message_select_and_dispatch(&dispatch_timeout); + } + + printf("passed\n"); + } + } + message_end(); return (0);
