Well I did know that function but I've heard that "libuv may also combine 
multiple calls to uv_async_send and invoke your callback only once. The 
only guarantee that libuv makes is – The callback function is called *at 
least once* after the call to uv_async_send." at this link : 
http://nikhilm.github.io/uvbook/threads.html#inter-thread-communication

Now the example supplied with the book mentioned on the link has a global 
uv_async_t  object and the uv_async_send calls are made repetitively using 
this global object. I want to know if the problem mentioned above will go 
away if the object is locally declared in user thread. Book Example Code: 
https://github.com/nikhilm/uvbook/blob/master/code/progress/main.c 

Regards,
Sarav
On Tuesday, November 26, 2013 1:52:21 PM UTC+5:30, Saúl Ibarra Corretgé 
wrote:
>
> Hi, 
>
> In your case 2 examole you are calling uv_write in the recv_from_gw 
> function, which runs in a different thread. The only thread safe 
> function in libuv is uv_async_send, calling uv_write from a different 
> thread is a no go. 
>
>
> Cheers, 
>
> On 11/26/13 8:11 AM, Sarav Sandhu wrote: 
> > Hi, 
> > 
> > I'm trying to make a HTTP server and I'm seeing some strange behaviour 
> > while running binary :- 
> > 
> > *_Case 1 :_* 
> > _Code : 
> > _ 
> > 
> >         #include <stdio.h> 
> > 
> >         #include <stdlib.h> 
> > 
> >         #include <uv.h> 
> > 
> >         #include <errno.h> 
> > 
> >         #include <sys/msg.h> 
> > 
> >         #include <stdlib.h> 
> > 
> >         #include <string.h> 
> > 
> >         #include <unistd.h> 
> > 
> >         #include <pthread.h> 
> > 
> > 
> >         typedef struct 
> > 
> >         { 
> > 
> >         int mtype; 
> > 
> >         void *client; 
> > 
> >         void *buffer; 
> > 
> >         }stIPC; 
> > 
> >         int qid; 
> > 
> > 
> >         uv_stream_t *temp_client; 
> > 
> > 
> >         void on_new_connection(uv_stream_t *server, int status); 
> > 
> >         uv_buf_t alloc_buffer(uv_handle_t *handle, size_t 
> suggested_size); 
> > 
> >         void on_client_read(uv_stream_t *client, ssize_t nread, uv_buf_t 
> >         buf); 
> > 
> >         void on_client_write(uv_write_t *req, int status); 
> > 
> > 
> >         void on_new_connection(uv_stream_t *server, int status) { 
> > 
> >         if (status == -1) { 
> > 
> >         fprintf(stderr, "error on_new_connection"); 
> > 
> >         return; 
> > 
> >         } 
> > 
> > 
> >         uv_tcp_t *client; 
> > 
> >         client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); 
> > 
> >         uv_tcp_init(uv_default_loop(), client); 
> > 
> > 
> >         int result = uv_accept(server, (uv_stream_t*) client); 
> > 
> > 
> >         if (result == 0) { // success 
> > 
> >         uv_read_start((uv_stream_t*) client, alloc_buffer, 
> on_client_read); 
> > 
> >         } else { 
> > 
> >         printf( "Issue in accept, closing client\n"); 
> > 
> >         uv_close((uv_handle_t*) client, NULL); 
> > 
> >         } 
> > 
> >         } 
> > 
> > 
> >         uv_buf_t alloc_buffer(uv_handle_t *handle, size_t 
> suggested_size) { 
> > 
> >         return uv_buf_init((char*) malloc( 200 ), 200 ); 
> > 
> >         } 
> > 
> > 
> >         void on_client_read(uv_stream_t *_client, ssize_t nread, 
> >         uv_buf_t buf) { 
> > 
> >         if (nread == -1) { 
> > 
> >         fprintf(stderr, "error on_client_read"); 
> > 
> >         uv_close((uv_handle_t*) _client, NULL); 
> > 
> >         return; 
> > 
> >         } 
> > 
> >         else 
> > 
> >         { 
> > 
> >         //printf("3rd party sent data : [%s]\n", (char*) buf.base); 
> > 
> > 
> >         uv_read_stop( _client ); 
> > 
> > 
> >         ///This is test code 
> > 
> > 
> >         char *msg; 
> > 
> >         msg = (char*) malloc( 200 ); 
> > 
> >         bzero( msg, sizeof(msg) ); 
> > 
> >         strcpy( msg, "This is alien world\n" ); 
> > 
> > 
> >         uv_buf_t buf_1 = uv_buf_init( msg, sizeof(msg)); 
> > 
> >         buf_1.len = strlen(msg); 
> > 
> >         buf_1.base = msg; 
> > 
> > 
> >         temp_client = _client; 
> > 
> >         uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); 
> > 
> >         req->data = (void*) buf_1.base; 
> > 
> >         //sleep(1); 
> > 
> >         //printf("Gonna write data \n"); 
> > 
> >         uv_write(req, (uv_stream_t*) temp_client, &buf_1, 1, 
> >         on_client_write); 
> > 
> >         } 
> > 
> >         } 
> > 
> > 
> >         void recv_from_gw() 
> > 
> >         { 
> > 
> > 
> >         stIPC recvObj; 
> > 
> >         bzero( &recvObj, sizeof( stIPC ) ); 
> > 
> > 
> >         /*uv_work_t reciever_work; 
> > 
> >           reciever_work.data = (void*) &recvObj; 
> > 
> >           uv_queue_work( uv_default_loop(), &reciever_work, first, 
> >         second );*/ 
> > 
> >         while(0) // Do not run this part - while 0 
> > 
> >         { 
> > 
> >                                    //this section is disabled 
> > 
> >         } 
> > 
> >         } 
> > 
> > 
> >         } 
> > 
> > 
> >         void on_client_write(uv_write_t *req, int status) { 
> > 
> >         if (status == -1) { 
> > 
> >         fprintf(stderr, "error on_client_write\n"); 
> > 
> >         uv_close((uv_handle_t*) req->handle, NULL); 
> > 
> >         printf( "closing.written status = %d: \n", status); 
> > 
> >         return; 
> > 
> >         } 
> > 
> > 
> >         uv_close((uv_handle_t*) req->handle, NULL); 
> > 
> >         //printf("Data sent to 3rd party [%s]\n", (char*) req->data); 
> > 
> >         char *buffer = (char*) req->data; 
> > 
> >         //free(req); 
> > 
> >         //free(buffer); 
> > 
> >         } 
> > 
> > 
> >         void recv_from_3rd_party() 
> > 
> >         { 
> > 
> >         uv_tcp_t server; 
> > 
> >         uv_tcp_init(uv_default_loop(), &server); 
> > 
> >         struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000); 
> > 
> >         uv_tcp_bind(&server, bind_addr); 
> > 
> >         printf("In listen Thread\n"); 
> > 
> >         int r = uv_listen((uv_stream_t*) &server, 111, 
> on_new_connection); 
> > 
> >         if (r) { 
> > 
> >         fprintf(stderr, "error uv_listen"); 
> > 
> >         } 
> > 
> > 
> >         } 
> > 
> > 
> > 
> >         void main(void) { 
> > 
> >         int key = 7876; 
> > 
> >         qid = msgget( key, IPC_CREAT | 0666 ); 
> > 
> >         if ( qid == -1 ) 
> > 
> >         { 
> > 
> >         printf ( "msgget err, %d\n", errno); 
> > 
> >         exit(0); 
> > 
> >         } 
> > 
> >         uv_tcp_t server; 
> > 
> >         uv_tcp_init(uv_default_loop(), &server); 
> > 
> >         struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000); 
> > 
> >         uv_tcp_bind(&server, bind_addr); 
> > 
> > 
> >         int r = uv_listen((uv_stream_t*) &server, 200, 
> on_new_connection); 
> > 
> >         if (r) { 
> > 
> >         fprintf(stderr, "error uv_listen"); 
> > 
> >         } 
> > 
> > 
> >         pthread_t gwthrd, rdthrd; 
> > 
> >         pthread_create( &gwthrd, NULL,(void*) &recv_from_gw, NULL ); 
> > 
> >         uv_loop_t* loop = uv_default_loop(); 
> > 
> >         uv_run(loop, UV_RUN_DEFAULT); 
> > 
> >         } 
> > 
> > 
> > 
> > 
> > The above code runs flawlessly and without any errors(though memory 
> > free() call is pending). 
> > 
> > _*Case 2:*_ 
> > _Code :_ 
> > 
> >         #include <stdio.h> 
> > 
> >         #include <stdlib.h> 
> > 
> >         #include <uv.h> 
> > 
> >         #include <errno.h> 
> > 
> >         #include <sys/msg.h> 
> > 
> >         #include <stdlib.h> 
> > 
> >         #include <string.h> 
> > 
> >         #include <unistd.h> 
> > 
> >         #include <pthread.h> 
> > 
> > 
> >         typedef struct 
> > 
> >         { 
> > 
> >         int mtype; 
> > 
> >         void *client; 
> > 
> >         void *buffer; 
> > 
> >         }stIPC; 
> > 
> >         int qid; 
> > 
> > 
> >         uv_stream_t *temp_client; 
> > 
> > 
> >         void on_new_connection(uv_stream_t *server, int status); 
> > 
> >         uv_buf_t alloc_buffer(uv_handle_t *handle, size_t 
> suggested_size); 
> > 
> >         void on_client_read(uv_stream_t *client, ssize_t nread, uv_buf_t 
> >         buf); 
> > 
> >         void on_client_write(uv_write_t *req, int status); 
> > 
> > 
> >         void on_new_connection(uv_stream_t *server, int status) { 
> > 
> >         if (status == -1) { 
> > 
> >         fprintf(stderr, "error on_new_connection"); 
> > 
> >         return; 
> > 
> >         } 
> > 
> > 
> >         //printf( " gotta new client\n"); 
> > 
> >         uv_tcp_t *client; 
> > 
> >         client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); 
> > 
> >         uv_tcp_init(uv_default_loop(), client); 
> > 
> > 
> >         int result = uv_accept(server, (uv_stream_t*) client); 
> > 
> > 
> >         if (result == 0) { // success 
> > 
> >         uv_read_start((uv_stream_t*) client, alloc_buffer, 
> on_client_read); 
> > 
> >         } else { 
> > 
> >         printf( "Issue in accept, closing client\n"); 
> > 
> >         uv_close((uv_handle_t*) client, NULL); 
> > 
> >         } 
> > 
> >         } 
> > 
> > 
> >         uv_buf_t alloc_buffer(uv_handle_t *handle, size_t 
> suggested_size) { 
> > 
> >         return uv_buf_init((char*) malloc( 200 ), 200 ); 
> > 
> >         } 
> > 
> > 
> >         void on_client_read(uv_stream_t *_client, ssize_t nread, 
> >         uv_buf_t buf) { 
> > 
> >         if (nread == -1) { 
> > 
> >         fprintf(stderr, "error on_client_read"); 
> > 
> >         uv_close((uv_handle_t*) _client, NULL); 
> > 
> >         return; 
> > 
> >         } 
> > 
> >         else 
> > 
> >         { 
> > 
> >         //printf("3rd party sent data : [%s]\n", (char*) buf.base); 
> > 
> > 
> >         uv_read_stop( _client ); 
> > 
> > 
> >         stIPC sendobj; 
> > 
> >         sendobj.mtype=111; 
> > 
> >         sendobj.client =  _client; 
> > 
> >         sendobj.buffer =  &buf; 
> > 
> >         int ret = msgsnd( qid, (void*) &sendobj, sizeof(sendobj), 
> >         IPC_NOWAIT); 
> > 
> >         if ( -1 == ret ) 
> > 
> >         { 
> > 
> >         printf ("Msgsnd err : %d\n ", errno); 
> > 
> >         exit(0); 
> > 
> >         } 
> > 
> >         } 
> > 
> >         } 
> > 
> > 
> >         void recv_from_gw() 
> > 
> >         { 
> > 
> > 
> >         stIPC recvObj; 
> > 
> >         bzero( &recvObj, sizeof( stIPC ) ); 
> > 
> > 
> >         /*uv_work_t recie=ver_work; 
> > 
> >           reciever_work.data = (void*) &recvObj; 
> > 
> >           uv_queue_work( uv_default_loop(), &reciever_work, first, 
> >         second );*/ 
> > 
> >         while(1) 
> > 
> >         { 
> > 
> >         int ret=0; 
> > 
> >         //printf("in msgrcv\n"); 
> > 
> >         //sleep(1); 
> > 
> >         //usleep(11); 
> > 
> >         ret = msgrcv( qid, &recvObj, sizeof( recvObj ), 111, 0 ); 
> > 
> >         if( -1 == ret ) 
> > 
> >         { 
> > 
> >         //usleep(500); 
> > 
> >         if( ENOMSG != errno ) 
> > 
> >         exit(0); 
> > 
> >         else 
> > 
> >         { 
> > 
> >         printf( "no data from q yet \n"); 
> > 
> >         usleep(1); 
> > 
> >         } 
> > 
> >         } 
> > 
> >         else 
> > 
> >         { 
> > 
> >         usleep(111); 
> > 
> >         printf( "got data from q \n"); 
> > 
> >         uv_stream_t* client = recvObj.client; 
> > 
> >         char *msg; 
> > 
> >         msg = (char*) malloc( 200 ); 
> > 
> >         bzero( msg, sizeof(msg) ); 
> > 
> >         strcpy( msg, "This is alien world\n" ); 
> > 
> > 
> >         uv_buf_t buf = uv_buf_init( msg, sizeof(msg)); 
> > 
> >         buf.len = strlen(msg); 
> > 
> >         buf.base = msg; 
> > 
> > 
> >         uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); 
> > 
> >         req->data = (void*) buf.base; 
> > 
> >         printf("Gonna write data \n"); 
> > 
> >         uv_write(req, (uv_stream_t*) client, &buf, 1, on_client_write); 
> > 
> >         } 
> > 
> >         } 
> > 
> > 
> >         } 
> > 
> > 
> >         void on_client_write(uv_write_t *req, int status) { 
> > 
> >         if (status == -1) { 
> > 
> >         fprintf(stderr, "error on_client_write\n"); 
> > 
> >         uv_close((uv_handle_t*) req->handle, NULL); 
> > 
> >         printf( "closing.written status = %d: \n", status); 
> > 
> >         return; 
> > 
> >         } 
> > 
> > 
> >         //printf( "closing.written status = %d: \n", status); 
> > 
> >         uv_close((uv_handle_t*) req->handle, NULL); 
> > 
> >         //printf("Data sent to 3rd party [%s]\n", (char*) req->data); 
> > 
> >         char *buffer = (char*) req->data; 
> > 
> >         //free(req); 
> > 
> >         //free(buffer); 
> > 
> >         } 
> > 
> > 
> >         void recv_from_3rd_party() 
> > 
> >         { 
> > 
> >         uv_tcp_t server; 
> > 
> >         uv_tcp_init(uv_default_loop(), &server); 
> > 
> >         struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000); 
> > 
> >         uv_tcp_bind(&server, bind_addr); 
> > 
> >         printf("In listen Thread\n"); 
> > 
> >         int r = uv_listen((uv_stream_t*) &server, 111, 
> on_new_connection); 
> > 
> >         if (r) { 
> > 
> >         fprintf(stderr, "error uv_listen"); 
> > 
> >         //return 1; 
> > 
> >         } 
> > 
> > 
> >         } 
> > 
> > 
> > 
> >         void main(void) { 
> > 
> >         printf( "in sarav \n"); 
> > 
> >         //buffer_size = 200; 
> > 
> >         int key = 7876; 
> > 
> >         qid = msgget( key, IPC_CREAT | 0666 ); 
> > 
> >         if ( qid == -1 ) 
> > 
> >         { 
> > 
> >         printf ( "msgget err, %d\n", errno); 
> > 
> >         exit(0); 
> > 
> >         } 
> > 
> >         uv_tcp_t server; 
> > 
> >         uv_tcp_init(uv_default_loop(), &server); 
> > 
> >         struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000); 
> > 
> >         uv_tcp_bind(&server, bind_addr); 
> > 
> > 
> >         int r = uv_listen((uv_stream_t*) &server, 200, 
> on_new_connection); 
> > 
> >         if (r) { 
> > 
> >         fprintf(stderr, "error uv_listen"); 
> > 
> >         //return 1; 
> > 
> >         } 
> > 
> > 
> >         pthread_t gwthrd, rdthrd; 
> > 
> >         pthread_create( &gwthrd, NULL,(void*) &recv_from_gw, NULL ); 
> > 
> >         uv_loop_t* loop = uv_default_loop(); 
> > 
> >         //uv_run(uv_default_loop(), UV_RUN_DEFAULT); 
> > 
> >         uv_run(loop, UV_RUN_DEFAULT); 
> > 
> >         //return 0; 
> > 
> >         } 
> > 
> > 
> > 
> > Now this is the code that is causing weird behavior.  When we run serial 
> > connection(one connection at a time), the previous uv_write's callback 
> > is not called until new connection is not initiated. The same, when 
> > happening in parallel connections, throws assert at line " 
> >   uv__req_unregister(stream->loop, req);" in file src/unix/stream.c, 
> > line number 877. 
> > 
> > Please suggest the solution to this problem. 
> > 
> > OS : Fedora Linux 17 
> > gcc : gcc version 4.7.2 20120921 (Red Hat 4.7.2-2) (GCC) 
> > libuv version 0.11.1 
> > code Compilation command : gcc -O0 -g server.c -o server 
> > -I../../libuv/include/ ../../libuv/libuv.a -lrt -ldl -lm -pthread 
> > 
> > -- 
> > You received this message because you are subscribed to the Google 
> > Groups "libuv" group. 
> > To unsubscribe from this group and stop receiving emails from it, send 
> > an email to [email protected] <javascript:>. 
> > To post to this group, send email to [email protected]<javascript:>. 
>
> > Visit this group at http://groups.google.com/group/libuv. 
> > For more options, visit https://groups.google.com/groups/opt_out. 
>
>
> -- 
> Sa�l Ibarra Corretg� 
> http://bettercallsaghul.com<http://www.google.com/url?q=http%3A%2F%2Fbettercallsaghul.com&sa=D&sntz=1&usg=AFQjCNG4oksE5WazGuLiKu5OevbsdMTRjw>
>  
>

-- 
You received this message because you are subscribed to the Google Groups 
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/libuv.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to