Author: jmcd
Date: 2006-09-06 14:50:52 +0000 (Wed, 06 Sep 2006)
New Revision: 18180

WebSVN: 
http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=18180

Log:
>From Aleksey Fedoseev, 

The patch consists of several modifications:

1) added preallocation of incoming stream buffer (minus 1 malloc/free
per incoming message)
2) corrected program exit (cleaning up communication buffer / closing
dispatcher's sockets)
3) added message size test


Modified:
   branches/tmp/vl-messaging/source/include/messages.h
   branches/tmp/vl-messaging/source/lib/messages_socket.c
   branches/tmp/vl-messaging/source/lib/messages_stream.c
   branches/tmp/vl-messaging/source/torture/msgtest.c


Changeset:
Modified: branches/tmp/vl-messaging/source/include/messages.h
===================================================================
--- branches/tmp/vl-messaging/source/include/messages.h 2006-09-06 14:29:57 UTC 
(rev 18179)
+++ branches/tmp/vl-messaging/source/include/messages.h 2006-09-06 14:50:52 UTC 
(rev 18180)
@@ -125,6 +125,7 @@
        struct message_list *prev, *next;
        struct message_rec *msg;
        size_t processed; /* number of read/written bytes */
+       size_t allocated; /* number of allocated bytes */
 };
 
 #endif

Modified: branches/tmp/vl-messaging/source/lib/messages_socket.c
===================================================================
--- branches/tmp/vl-messaging/source/lib/messages_socket.c      2006-09-06 
14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/lib/messages_socket.c      2006-09-06 
14:50:52 UTC (rev 18180)
@@ -101,9 +101,7 @@
        cleanup_messages(wait_send);
 
        if (socket_fd >= 0) {
-               if (mtype == MESSAGING_TYPE_STREAM) {
-                       shutdown_stream_sockets();
-               }
+               shutdown_stream_sockets();
                close(socket_fd);
                socket_fd = -1;
        }

Modified: branches/tmp/vl-messaging/source/lib/messages_stream.c
===================================================================
--- branches/tmp/vl-messaging/source/lib/messages_stream.c      2006-09-06 
14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/lib/messages_stream.c      2006-09-06 
14:50:52 UTC (rev 18180)
@@ -70,7 +70,7 @@
 };
 static struct messaging_client *clients_cache = NULL;
 
-static struct message_list *tcp_incoming = NULL;
+static struct message_list *disp_incoming = NULL;
 static int dispatcher_pipe = -1;
 
 /* approximate maximum number of connected clients in the list */
@@ -80,6 +80,8 @@
 #define MESSAGING_DISPATCHER_SOCKET    "dispatcher"
 #define MESSAGING_DISPATCHER_LOCKFILE "dispatcher.pid"
 
+#define INITIAL_CONTAINER_SIZE         64
+
 static const char *dispatch_path(void)
 {
        static char *name = NULL;
@@ -97,6 +99,56 @@
 }
 
 /****************************************************************************
+ Allocate/reallocate message container
+****************************************************************************/
+
+struct message_list *allocate_container(TALLOC_CTX *mem_ctx,
+                                       struct message_list *cnt,
+                                       size_t needsize) 
+{
+       uint8_t *buffer;
+       size_t size = INITIAL_CONTAINER_SIZE;
+       
+       while(size < needsize) size *= 2;
+
+       if(cnt == NULL) {               
+               cnt = TALLOC_ZERO_P(mem_ctx, struct message_list);
+               if(cnt == NULL) {
+                       DEBUG(0, ("talloc failed\n"));
+                       return NULL;
+               }
+               buffer = TALLOC_ARRAY(cnt, uint8_t, size);
+               if(buffer == NULL) {
+                       DEBUG(0, ("talloc failed\n"));
+                       TALLOC_FREE(cnt);
+                       return NULL;
+               }
+       } else {
+
+               SMB_ASSERT(size > cnt->allocated);
+
+               buffer = TALLOC_REALLOC_ARRAY(cnt, cnt->msg, uint8_t, size);
+               if(buffer == NULL) {
+                       DEBUG(0, ("realloc failed\n"));
+                       TALLOC_FREE(cnt->msg);
+                       /* try to allocate with talloc */
+                       buffer = TALLOC_ARRAY(cnt, uint8_t, size);
+                       if(buffer == NULL) {
+                               DEBUG(0, ("talloc failed\n"));
+                               TALLOC_FREE(cnt);
+                               return NULL;
+                       }
+               }
+       }
+
+       cnt->msg = (struct message_rec*)buffer;
+       cnt->processed = (size_t)-1;
+       cnt->allocated = size;
+
+       return cnt;
+}
+
+/****************************************************************************
  Client's queue helper functions
 ****************************************************************************/
 
@@ -145,7 +197,8 @@
        
        for (client = clients_cache; client != NULL; client = client->next) {
                clients_count++;
-               if(client->outgoing == NULL && client->incoming == NULL) {
+               if(client->outgoing == NULL &&
+                  client->incoming->processed == (size_t)-1) {
                        last_free = client;
                }
        }
@@ -194,15 +247,24 @@
                drop_old_client();
        }
 
+       c->next = c->prev = NULL;
+
        /* set invalid pid cause we don't know yet who is the peer */
        c->pid = procid_self();
        c->pid.pid = -1; 
 
        c->fd = fd;
-       c->incoming = NULL;
+
+       /* be sure that we'll close socket */
+       talloc_set_destructor(c, messaging_client_destr); 
+
+       c->incoming = allocate_container(c, NULL, 0);
+       if (c->incoming == NULL) {
+               TALLOC_FREE(c);
+               return ; 
+       }
        c->outgoing = NULL;
        c->connected = True;
-       talloc_set_destructor(c, messaging_client_destr);
 
        DEBUG(10, ("Adding new client\n"));
 
@@ -291,12 +353,18 @@
                return;
        }
 
+       c->next = c->prev = NULL;
+
        /* set foreign dispatcher pid */
        c->pid.ip = addr.sin_addr;
        c->pid.pid = MESSAGING_DISPATCHER_PID; 
 
        c->fd = fd;
-       c->incoming = NULL;
+       c->incoming = allocate_container(c, NULL, 0);
+       if(c->incoming == NULL) {
+               TALLOC_FREE(c);
+               return ;
+       }
        c->outgoing = NULL;
        c->connected = True;
        talloc_set_destructor(c, messaging_client_destr);
@@ -378,9 +446,17 @@
                return NULL;
        }
 
+       result->next = result->prev = NULL;
+
+       result->fd = -1;
+
        result->pid = *pid;
        result->connected = False;
-       result->incoming = NULL;
+       result->incoming = allocate_container(result, NULL, 0);
+       if(result->incoming == NULL) {
+               TALLOC_FREE(result);
+               return NULL;
+       }
        result->outgoing = NULL;
 
        if(mtype == MESSAGING_TYPE_DISPATCHER) {
@@ -489,7 +565,7 @@
 
 static BOOL container_full(const struct message_list *li)
 {
-       return ((li != NULL) && (li->processed != 0) &&
+       return ((li != NULL) && (li->processed != (size_t)-1) &&
                (li->processed == li->msg->len));
 }
 
@@ -505,19 +581,9 @@
        ssize_t nread;
        uint8_t *target;
 
-       if ((cnt != NULL) && container_full(cnt)) {
-               TALLOC_FREE(cnt);
-       }
-
-       if (cnt == NULL) {
-               cnt = TALLOC_ZERO_P(mem_ctx, struct message_list);
-               if (cnt == NULL) {
-                       DEBUG(0, ("talloc failed\n"));
-                       return NULL;
-               }
-       }
-
-       if (cnt->msg == NULL) {
+       SMB_ASSERT(cnt != NULL);
+       
+       if (cnt->processed == (size_t)-1) {
                /* First, read message length */
                target = (uint8_t *)(&len);
                to_read = sizeof(size_t);
@@ -529,38 +595,44 @@
        nread = sys_read(fd, target, to_read);
        if(nread == 0) {
                DEBUG(10, ("Messaging peer closed\n"));
-               TALLOC_FREE(cnt);
+               cnt->processed = (size_t)-1;
                return NULL;
        }
        if (nread < 0) {
                DEBUG(5, ("Error while reading from socket (errno = %d)\n",
                          errno));
-               TALLOC_FREE(cnt);
+               cnt->processed = (size_t)-1;            
                return NULL;
        }
 
-       cnt->processed += nread;
+       if(cnt->processed == (size_t)-1) {
 
-       if (cnt->msg == NULL) {
-               if(cnt->processed < sizeof(size_t)) {
+               if(nread < sizeof(size_t)) {
                        DEBUG(0, ("Received less that %d bytes!\n",
                                  sizeof(size_t)));
-                       TALLOC_FREE(cnt);
+                       cnt->processed = (size_t)-1;
                        return NULL;
                }
 
                DEBUG(10, ("Receiving msg of length %d\n",
                           len));
-               cnt->msg = (struct message_rec*)TALLOC_ZERO_ARRAY(cnt, char,
-                                                                 len);
-               if (cnt->msg == NULL) {
-                       DEBUG(0, ("talloc failed\n"));
-                       TALLOC_FREE(cnt);
-                       return NULL;
+
+               if(len > cnt->allocated) {
+                       DEBUG(10, ("Reallocating incoming container"
+                                  " to fit size %d\n", len));
+                       cnt = allocate_container(mem_ctx, cnt, len);
+                       if(cnt == NULL) {
+                               DEBUG(0, ("Reallocating failed!\n"));
+                               return NULL;            
+                       }
                }
-               cnt->msg->len = len;
+
+               cnt->processed = 0;
+               cnt->msg->len = len;            
        }
 
+       cnt->processed += nread;
+
        return cnt;
 }
 
@@ -570,23 +642,23 @@
 
 static BOOL receive_one_message(int fd,
                                TALLOC_CTX *mem_ctx,
-                               struct message_list **msg,
+                               struct message_list *msg,
                                BOOL* received)
 {
        *received = False;
-       *msg = read_from_stream_socket(fd, mem_ctx, *msg);
-       if (*msg == NULL) {
+
+       if (read_from_stream_socket(fd, mem_ctx, msg) == NULL) {
                DEBUG(5, ("failed to read\n"));
                return False;
        }
 
-       if (!container_full(*msg)) {
+       if (!container_full(msg)) {
                return True;
        }
 
-       SMB_ASSERT((*msg)->msg != NULL);
+       SMB_ASSERT(msg->msg != NULL);
 
-       if ((*msg)->msg->len < sizeof(struct message_rec)) {
+       if (msg->msg->len < sizeof(struct message_rec)) {
                DEBUG(5, ("Message too short\n"));
                return False;
        }
@@ -605,7 +677,7 @@
        struct message_rec *msg_rec;
        BOOL received = False;
 
-       if(!receive_one_message(c->fd, c, &c->incoming, &received)) {
+       if(!receive_one_message(c->fd, c, c->incoming, &received)) {
                DEBUG(5, ("killing client %s\n", procid_str_static(&c->pid)));
                return False;
        }
@@ -621,7 +693,8 @@
                        DEBUG(10, ("Got hello from %s\n",
                                   procid_str_static(&msg_rec->src)));
                        c->pid = msg_rec->src;
-                       TALLOC_FREE((c->incoming));
+
+                       c->incoming->processed = (size_t)-1;
                }
        }
 
@@ -784,11 +857,7 @@
                                
                                DLIST_ADD_END((*received), li, tmp);
 
-                               /* 
-                                  We don't free client->incoming, because it
-                                  will be freed in a next
-                                  read_from_stream_socket call.
-                               */
+                               client->incoming->processed = (size_t)-1;
                        }
                }
                
@@ -834,6 +903,8 @@
                close(dispatcher_pipe);
                dispatcher_pipe = -1;
        }
+       
+       TALLOC_FREE(disp_incoming);
 }
 
 /****************************************************************************
@@ -1003,6 +1074,11 @@
 
 #ifdef CLUSTER_SUPPORT
        tcp_fd = open_remote_listener();
+       if (tcp_fd < 0) {
+               DEBUG(0, ("Can't create tcp socket\n"));
+               close(fd);
+               exit(1);
+       }
 #endif /* CLUSTER_SUPPORT */
 
        ok = True;
@@ -1027,6 +1103,11 @@
 
        shutdown_stream_sockets();
 
+       close(fd);
+#ifdef CLUSTER_SUPPORT
+       close(tcp_fd);
+#endif
+
        /* 
           Presently we don't remove socket & lock file. Should be fixed later
        */
@@ -1193,6 +1274,15 @@
        return -1;
 
 good:
+
+       SMB_ASSERT(disp_incoming == NULL);
+       disp_incoming = allocate_container(NULL, NULL, 0);
+       if(disp_incoming == NULL) {
+               DEBUG(0, ("Can't initialize tcp incoming buffer\n"));
+               close(sock);
+               return -1;
+       }
+
        DEBUG(10, ("Dispatcher connected, say hello.\n"));
        /* say HELLO to the dispatcher to identify oneself */
        message_send_pid_socket(pid_to_procid(MESSAGING_DISPATCHER_PID),
@@ -1218,15 +1308,35 @@
 void receive_on_socket_dispatcher(int socket_fd, struct message_list **queue)
 {
        BOOL received = False;
-       if(!receive_one_message(socket_fd, NULL, &tcp_incoming, &received)) {
+       if(!receive_one_message(socket_fd, NULL, disp_incoming, &received)) {
                DEBUG(0, ("Can't receive a message through the daemon\n"));
                return ;
        }       
 
        if(received) {
-               struct message_list *tmp;
-               DLIST_ADD_END((*queue), tcp_incoming, tmp);
-               /* give memory control to a higher level code */
-               tcp_incoming = NULL; 
+               struct message_list *tmp, *li;
+               
+               SMB_ASSERT(disp_incoming != NULL && disp_incoming->msg != NULL);
+               
+               li = TALLOC_ZERO_P(NULL, struct message_list);
+               if(li == NULL) {
+                       DEBUG(0, ("talloc failed\n"));
+                       return ;
+               }
+               li->msg = (struct message_rec*)TALLOC_ARRAY(
+                       li, uint8_t,
+                       disp_incoming->msg->len);
+               if(li->msg == NULL) {
+                       DEBUG(0, ("talloc failed\n"));
+                       TALLOC_FREE(li);
+                       return ;
+               }
+               
+               memcpy(li->msg, disp_incoming->msg,
+                      disp_incoming->msg->len);
+
+               DLIST_ADD_END((*queue), li, tmp);
+
+               disp_incoming->processed = (size_t)-1; 
        }
 }

Modified: branches/tmp/vl-messaging/source/torture/msgtest.c
===================================================================
--- branches/tmp/vl-messaging/source/torture/msgtest.c  2006-09-06 14:29:57 UTC 
(rev 18179)
+++ branches/tmp/vl-messaging/source/torture/msgtest.c  2006-09-06 14:50:52 UTC 
(rev 18180)
@@ -56,7 +56,10 @@
 {
        pid_t pid;
        int i, n;
-       char buf[12], *p, *addr;
+       char buf[12], *large_buf;
+#ifdef CLUSTER_SUPPORT
+       char *p, *addr;
+#endif
        struct timeval dispatch_timeout = timeval_set(0, DISPATCH_TIMEOUT);
        struct process_id receiver;
 
@@ -181,6 +184,36 @@
                       (ping_count+pong_count)/timeval_elapsed(&tv));
        }       
 
+       /* large messages test */
+       pong_count = 0;
+
+       {
+               struct timeval tv = timeval_current();
+               size_t timelimit = n;
+               size_t ping_count = 0;
+
+               printf("Starting large messages test.\n");
+
+               for(i = 1; i <= (1 << 16) && timeval_elapsed(&tv) < timelimit; 
i *= 2) {
+                       large_buf = TALLOC_ZERO(NULL, i);
+                       if(large_buf == NULL) {
+                               printf("talloc failed!\n");
+                               break;
+                       }
+                       
+                       printf("size %d... ", i);
+                       
+                       if(message_send_pid(receiver, MSG_PING,
+                                           large_buf, i, False)) ping_count++;
+                       
+                       while (ping_count > pong_count && timeval_elapsed(&tv) 
< timelimit) {
+                               message_select_and_dispatch(&dispatch_timeout);
+                       }
+
+                       printf("passed\n");
+               }               
+       }
+
        message_end();
 
        return (0);

Reply via email to