Attached is the tcpserver patch that threads broadcast call and does
only one parsing, thus ostensibly offering improved performance and
xrun-free operation.

Please note that I've not tested this with the broadcasting of a file
(symbol) but it should work.

AFAIK now everything works in tcpserver/client on Linux and does so
stably.

Many thanks to all for their help in facilitating this patch!

Best wishes,

Ico

--- tcpserver.c.orig	2010-05-04 12:54:35.000000000 -0400
+++ tcpserver.c	2010-05-06 23:19:40.000000000 -0400
@@ -110,6 +110,14 @@
     char                        x_msginbuf[MAX_UDP_RECEIVE];
 } t_tcpserver;
 
+typedef struct _tcpserver_broadcast_params
+{
+	t_tcpserver 	*x;
+	//t_symbol 		*s;
+	int 			argc;
+	t_atom 			argv[MAX_UDP_RECEIVE];
+} t_tcpserver_broadcast_params;
+
 static t_tcpserver_socketreceiver *tcpserver_socketreceiver_new(void *owner, t_tcpserver_socketnotifier notifier,
     t_tcpserver_socketreceivefn socketreceivefn);
 static int tcpserver_socketreceiver_doread(t_tcpserver_socketreceiver *x);
@@ -121,6 +129,7 @@
 static int tcpserver_send_buffer_avaliable_for_client(t_tcpserver *x, int client);
 #endif
 static void *tcpserver_send_buf_thread(void *arg);
+static void *tcpserver_broadcast_thread(void *arg);
 static void tcpserver_client_send(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv);
 static void tcpserver_output_client_state(t_tcpserver *x, int client);
 static int tcpserver_get_socket_send_buf_size(int sockfd);
@@ -778,18 +787,242 @@
     return;
 }
 
-/* broadcasts a message to all connected clients */
-static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
+/* broadcasts messages to all clients */
+/* Ivica Ico Bukvic <[email protected]> 5/6/10 rewrote broadcast to be xrun-free */
+static void *tcpserver_broadcast_thread(void *arg)
 {
-    int     client;
-    /* enumerate through the clients and send each the message */
-    for(client = 0; client < x->x_nconnections; client++)	/* check if connection exists */
-    {
-        if(x->x_sr[client]->sr_fd >= 0)
-        { /* socket exists for this client */
-            tcpserver_send_bytes(client, x, argc, argv);
+	t_tcpserver_broadcast_params *ttbp = (t_tcpserver_broadcast_params *)arg;
+
+    int     				client;
+    static char             byte_buf[MAX_UDP_RECEIVE];// arbitrary maximum similar to max IP packet size
+    int                     i, j, d;
+    unsigned char           c;
+    float                   f, e;
+    int                     length;
+    size_t                  flen = 0;
+    //int                   sockfd = x->x_sr[client]->sr_fd;
+	int						sockfd = 0;
+    char                    fpath[FILENAME_MAX];
+    FILE                    *fptr;
+    t_atom                  output_atom[3];
+    t_tcpserver_send_params *ttsp;
+    pthread_t               sender_thread;
+    pthread_attr_t          sender_attr;
+    int                     sender_thread_result;
+
+	int                     result;
+
+    for (i = j = 0; i < ttbp->argc; ++i)
+    {
+        if (ttbp->argv[i].a_type == A_FLOAT)
+        { /* load floats into buffer as long as they are integers on [0..255]*/
+            f = ttbp->argv[i].a_w.w_float;
+            d = (int)f;
+            e = f - d;
+#ifdef DEBUG
+            post("%s: argv[%d]: float:%f int:%d delta:%f", objName, i, f, d, e);
+#endif
+            if (e != 0)
+            {
+                error("%s: item %d (%f) is not an integer", objName, i, f);
+				freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+				return NULL;
+            }
+            if ((d < 0) || (d > 255))
+            {
+                error("%s: item %d (%f) is not between 0 and 255", objName, i, f);
+				freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+				return NULL;
+            }
+            c = (unsigned char)d; /* make sure it doesn't become negative; this only matters for post() */
+#ifdef DEBUG
+            post("%s: argv[%d]: %d", objName, i, c);
+#endif
+            byte_buf[j++] = c;
+            if (j >= MAX_UDP_RECEIVE)
+            { /* if the argument list is longer than our buffer, send the buffer whenever it's full */
+//LOOP
+				/* enumerate through the clients and send each the message */
+				for(client = 0; client < ttbp->x->x_nconnections; client++)	/* check if connection exists */
+				{
+#ifdef SIOCOUTQ
+		            if (tcpserver_send_buffer_avaliable_for_client(ttbp->x, client) < j)
+		            {
+		                error("%s: buffer too small for client(%d)", objName, client);
+						freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+						return NULL;
+		            }
+#endif // SIOCOUTQ
+					if(ttbp->x->x_sr[client]->sr_fd >= 0)
+					{ /* socket exists for this client */
+						sockfd = ttbp->x->x_sr[client]->sr_fd;
+    					result = send(sockfd, byte_buf, j, 0);
+						if (result <= 0)
+						{
+							sys_sockerror("tcpserver: send");
+							post("%s_send_buf: could not send data to client %d", objName, client+1);
+							freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+							return NULL;
+						}
+					}
+				}
+
+                flen += j;
+                j = 0;
+            }
         }
+        else if (ttbp->argv[i].a_type == A_SYMBOL)
+        { /* symbols are interpreted to be file names; attempt to load the file and send it */
+
+            atom_string(&ttbp->argv[i], fpath, FILENAME_MAX);
+#ifdef DEBUG
+            post ("%s: fname: %s", objName, fpath);
+#endif
+            fptr = fopen(fpath, "rb");
+            if (fptr == NULL)
+            {
+                error("%s: unable to open \"%s\"", objName, fpath);
+				freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+				return NULL;
+            }
+            rewind(fptr);
+#ifdef DEBUG
+            post("%s: d is %d", objName, d);
+#endif
+            while ((d = fgetc(fptr)) != EOF)
+            {
+                byte_buf[j++] = (char)(d & 0x0FF);
+#ifdef DEBUG
+                post("%s: byte_buf[%d] = %d", objName, j-1, byte_buf[j-1]);
+#endif
+                if (j >= MAX_UDP_RECEIVE)
+                { /* if the file is longer than our buffer, send the buffer whenever it's full */
+//LOOP
+					/* enumerate through the clients and send each the message */
+					for(client = 0; client < ttbp->x->x_nconnections; client++)	/* check if connection exists */
+					{
+		                /* this might be better than allocating huge amounts of memory */
+#ifdef SIOCOUTQ
+		                if (tcpserver_send_buffer_avaliable_for_client(x, client) < j)
+		                {
+		                    error("%s: buffer too small for client(%d)", objName, client);
+							freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+							return NULL;
+		                }
+#endif // SIOCOUTQ
+						if(ttbp->x->x_sr[client]->sr_fd >= 0)
+						{ /* socket exists for this client */
+							sockfd = ttbp->x->x_sr[client]->sr_fd;
+							result = send(sockfd, byte_buf, j, 0);
+							if (result <= 0)
+							{
+								sys_sockerror("tcpserver: send");
+								post("%s_send_buf: could not send data to client %d", objName, client+1);
+								freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+								return NULL;
+							}
+						}
+					}
+                    flen += j;
+                    j = 0;
+                }
+            }
+            flen += j;
+            fclose(fptr);
+            fptr = NULL;
+            post("%s: read \"%s\" length %d byte%s", objName, fpath, flen, ((d==1)?"":"s"));
+        }
+        else
+        { /* arg was neither a float nor a valid file name */
+            error("%s: item %d is not a float or a file name", objName, i);
+			freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+			return NULL;
+        }
+    }
+    length = j;
+    if (length > 0)
+    { /* send whatever remains in our buffer */
+
+		/* enumerate through the clients and send each the message */
+		for(client = 0; client < ttbp->x->x_nconnections; client++)	/* check if connection exists */
+		{
+#ifdef SIOCOUTQ
+		    if (tcpserver_send_buffer_avaliable_for_client(x, client) < length)
+		    {
+		        error("%s: buffer too small for client(%d)", objName, client);
+				freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+				return NULL;
+		    }
+#endif // SIOCOUTQ
+			if(ttbp->x->x_sr[client]->sr_fd >= 0)
+			{ /* socket exists for this client */
+				sockfd = ttbp->x->x_sr[client]->sr_fd;
+				result = send(sockfd, byte_buf, length, 0);
+				if (result <= 0)
+				{
+					sys_sockerror("tcpserver: send");
+					post("%s_send_buf: could not send data to client %d", objName, client+1);
+					freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+					return NULL;
+				}
+			}
+		}
+        flen += length;
     }
+    else post("%s: not a valid socket number (%d)", objName, sockfd);
+
+	freebytes (arg, sizeof (t_tcpserver_broadcast_params));
+	return NULL;
+}
+
+/* initializes separate thread that broadcasts to all clients */
+/* Ivica Ico Bukvic <[email protected]> 5/6/10 rewrote broadcast to be xrun-free */
+static void tcpserver_broadcast(t_tcpserver *x, t_symbol *s, int argc, t_atom *argv)
+{
+
+	int 							i;
+    t_tcpserver_broadcast_params 	*ttbp;
+    pthread_t               		broadcast_thread;
+    pthread_attr_t          		broadcast_attr;
+    int                     		broadcast_thread_result;
+
+	if (x->x_nconnections != 0) {
+        /* sender thread should start out detached so its resouces will be freed when it is done */
+        if (0!= (broadcast_thread_result = pthread_attr_init(&broadcast_attr)))
+        {
+            error("%s: pthread_attr_init failed: %d", objName, broadcast_thread_result);
+        }
+		else 
+		{
+        	if(0!= (broadcast_thread_result = pthread_attr_setdetachstate(&broadcast_attr, PTHREAD_CREATE_DETACHED)))
+        	{
+        	    error("%s: pthread_attr_setdetachstate failed: %d", objName, broadcast_thread_result);
+        	}
+			else
+			{
+				ttbp = (t_tcpserver_broadcast_params *)getbytes(sizeof(t_tcpserver_broadcast_params));
+				if (ttbp == NULL)
+				{
+					error("%s: unable to allocate %d bytes for t_tcpserver_broadcast_params", objName, sizeof(t_tcpserver_broadcast_params));
+				}
+				else
+				{
+
+					ttbp->x = x;
+					ttbp->argc = argc;
+					for (i = 0; i < argc; i++) {
+						ttbp->argv[i] = argv[i];
+					}
+
+					if (0 != (broadcast_thread_result = pthread_create(&broadcast_thread, &broadcast_attr, tcpserver_broadcast_thread, (void *)ttbp)))
+					{
+						error("%s: couldn't create broadcast thread (%d)", objName, broadcast_thread_result);
+						freebytes (ttbp, sizeof (t_tcpserver_broadcast_params));
+					}
+				}
+			}
+		}
+	}
 }
 
 /* ---------------- main tcpserver (receive) stuff --------------------- */
@@ -989,7 +1222,7 @@
 {
     int     i;
 
-    post("tcp_server_free...");
+    //post("tcp_server_free...");
     for(i = 0; i < MAX_CONNECT; i++)
     {
         if (x->x_sr[i] != NULL) 
@@ -1007,7 +1240,7 @@
         sys_rmpollfn(x->x_connectsocket);
         sys_closesocket(x->x_connectsocket);
     }
-    post("...tcp_server_free");
+    //post("...tcp_server_free");
 }
 
 void tcpserver_setup(void)
_______________________________________________
Pd-dev mailing list
[email protected]
http://lists.puredata.info/listinfo/pd-dev

Reply via email to