Hi,

In your case 2 examole you are calling uv_write in the recv_from_gw function, which runs in a different thread. The only thread safe function in libuv is uv_async_send, calling uv_write from a different thread is a no go.


Cheers,

On 11/26/13 8:11 AM, Sarav Sandhu wrote:
Hi,

I'm trying to make a HTTP server and I'm seeing some strange behaviour
while running binary :-

*_Case 1 :_*
_Code :
_

        #include <stdio.h>

        #include <stdlib.h>

        #include <uv.h>

        #include <errno.h>

        #include <sys/msg.h>

        #include <stdlib.h>

        #include <string.h>

        #include <unistd.h>

        #include <pthread.h>


        typedef struct

        {

        int mtype;

        void *client;

        void *buffer;

        }stIPC;

        int qid;


        uv_stream_t *temp_client;


        void on_new_connection(uv_stream_t *server, int status);

        uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size);

        void on_client_read(uv_stream_t *client, ssize_t nread, uv_buf_t
        buf);

        void on_client_write(uv_write_t *req, int status);


        void on_new_connection(uv_stream_t *server, int status) {

        if (status == -1) {

        fprintf(stderr, "error on_new_connection");

        return;

        }


        uv_tcp_t *client;

        client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));

        uv_tcp_init(uv_default_loop(), client);


        int result = uv_accept(server, (uv_stream_t*) client);


        if (result == 0) { // success

        uv_read_start((uv_stream_t*) client, alloc_buffer, on_client_read);

        } else {

        printf( "Issue in accept, closing client\n");

        uv_close((uv_handle_t*) client, NULL);

        }

        }


        uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) {

        return uv_buf_init((char*) malloc( 200 ), 200 );

        }


        void on_client_read(uv_stream_t *_client, ssize_t nread,
        uv_buf_t buf) {

        if (nread == -1) {

        fprintf(stderr, "error on_client_read");

        uv_close((uv_handle_t*) _client, NULL);

        return;

        }

        else

        {

        //printf("3rd party sent data : [%s]\n", (char*) buf.base);


        uv_read_stop( _client );


        ///This is test code


        char *msg;

        msg = (char*) malloc( 200 );

        bzero( msg, sizeof(msg) );

        strcpy( msg, "This is alien world\n" );


        uv_buf_t buf_1 = uv_buf_init( msg, sizeof(msg));

        buf_1.len = strlen(msg);

        buf_1.base = msg;


        temp_client = _client;

        uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));

        req->data = (void*) buf_1.base;

        //sleep(1);

        //printf("Gonna write data \n");

        uv_write(req, (uv_stream_t*) temp_client, &buf_1, 1,
        on_client_write);

        }

        }


        void recv_from_gw()

        {


        stIPC recvObj;

        bzero( &recvObj, sizeof( stIPC ) );


        /*uv_work_t reciever_work;

          reciever_work.data = (void*) &recvObj;

          uv_queue_work( uv_default_loop(), &reciever_work, first,
        second );*/

        while(0) // Do not run this part - while 0

        {

                                   //this section is disabled

        }

        }


        }


        void on_client_write(uv_write_t *req, int status) {

        if (status == -1) {

        fprintf(stderr, "error on_client_write\n");

        uv_close((uv_handle_t*) req->handle, NULL);

        printf( "closing.written status = %d: \n", status);

        return;

        }


        uv_close((uv_handle_t*) req->handle, NULL);

        //printf("Data sent to 3rd party [%s]\n", (char*) req->data);

        char *buffer = (char*) req->data;

        //free(req);

        //free(buffer);

        }


        void recv_from_3rd_party()

        {

        uv_tcp_t server;

        uv_tcp_init(uv_default_loop(), &server);

        struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);

        uv_tcp_bind(&server, bind_addr);

        printf("In listen Thread\n");

        int r = uv_listen((uv_stream_t*) &server, 111, on_new_connection);

        if (r) {

        fprintf(stderr, "error uv_listen");

        }


        }



        void main(void) {

        int key = 7876;

        qid = msgget( key, IPC_CREAT | 0666 );

        if ( qid == -1 )

        {

        printf ( "msgget err, %d\n", errno);

        exit(0);

        }

        uv_tcp_t server;

        uv_tcp_init(uv_default_loop(), &server);

        struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);

        uv_tcp_bind(&server, bind_addr);


        int r = uv_listen((uv_stream_t*) &server, 200, on_new_connection);

        if (r) {

        fprintf(stderr, "error uv_listen");

        }


        pthread_t gwthrd, rdthrd;

        pthread_create( &gwthrd, NULL,(void*) &recv_from_gw, NULL );

        uv_loop_t* loop = uv_default_loop();

        uv_run(loop, UV_RUN_DEFAULT);

        }




The above code runs flawlessly and without any errors(though memory
free() call is pending).

_*Case 2:*_
_Code :_

        #include <stdio.h>

        #include <stdlib.h>

        #include <uv.h>

        #include <errno.h>

        #include <sys/msg.h>

        #include <stdlib.h>

        #include <string.h>

        #include <unistd.h>

        #include <pthread.h>


        typedef struct

        {

        int mtype;

        void *client;

        void *buffer;

        }stIPC;

        int qid;


        uv_stream_t *temp_client;


        void on_new_connection(uv_stream_t *server, int status);

        uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size);

        void on_client_read(uv_stream_t *client, ssize_t nread, uv_buf_t
        buf);

        void on_client_write(uv_write_t *req, int status);


        void on_new_connection(uv_stream_t *server, int status) {

        if (status == -1) {

        fprintf(stderr, "error on_new_connection");

        return;

        }


        //printf( " gotta new client\n");

        uv_tcp_t *client;

        client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));

        uv_tcp_init(uv_default_loop(), client);


        int result = uv_accept(server, (uv_stream_t*) client);


        if (result == 0) { // success

        uv_read_start((uv_stream_t*) client, alloc_buffer, on_client_read);

        } else {

        printf( "Issue in accept, closing client\n");

        uv_close((uv_handle_t*) client, NULL);

        }

        }


        uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) {

        return uv_buf_init((char*) malloc( 200 ), 200 );

        }


        void on_client_read(uv_stream_t *_client, ssize_t nread,
        uv_buf_t buf) {

        if (nread == -1) {

        fprintf(stderr, "error on_client_read");

        uv_close((uv_handle_t*) _client, NULL);

        return;

        }

        else

        {

        //printf("3rd party sent data : [%s]\n", (char*) buf.base);


        uv_read_stop( _client );


        stIPC sendobj;

        sendobj.mtype=111;

        sendobj.client =  _client;

        sendobj.buffer =  &buf;

        int ret = msgsnd( qid, (void*) &sendobj, sizeof(sendobj),
        IPC_NOWAIT);

        if ( -1 == ret )

        {

        printf ("Msgsnd err : %d\n ", errno);

        exit(0);

        }

        }

        }


        void recv_from_gw()

        {


        stIPC recvObj;

        bzero( &recvObj, sizeof( stIPC ) );


        /*uv_work_t recie=ver_work;

          reciever_work.data = (void*) &recvObj;

          uv_queue_work( uv_default_loop(), &reciever_work, first,
        second );*/

        while(1)

        {

        int ret=0;

        //printf("in msgrcv\n");

        //sleep(1);

        //usleep(11);

        ret = msgrcv( qid, &recvObj, sizeof( recvObj ), 111, 0 );

        if( -1 == ret )

        {

        //usleep(500);

        if( ENOMSG != errno )

        exit(0);

        else

        {

        printf( "no data from q yet \n");

        usleep(1);

        }

        }

        else

        {

        usleep(111);

        printf( "got data from q \n");

        uv_stream_t* client = recvObj.client;

        char *msg;

        msg = (char*) malloc( 200 );

        bzero( msg, sizeof(msg) );

        strcpy( msg, "This is alien world\n" );


        uv_buf_t buf = uv_buf_init( msg, sizeof(msg));

        buf.len = strlen(msg);

        buf.base = msg;


        uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));

        req->data = (void*) buf.base;

        printf("Gonna write data \n");

        uv_write(req, (uv_stream_t*) client, &buf, 1, on_client_write);

        }

        }


        }


        void on_client_write(uv_write_t *req, int status) {

        if (status == -1) {

        fprintf(stderr, "error on_client_write\n");

        uv_close((uv_handle_t*) req->handle, NULL);

        printf( "closing.written status = %d: \n", status);

        return;

        }


        //printf( "closing.written status = %d: \n", status);

        uv_close((uv_handle_t*) req->handle, NULL);

        //printf("Data sent to 3rd party [%s]\n", (char*) req->data);

        char *buffer = (char*) req->data;

        //free(req);

        //free(buffer);

        }


        void recv_from_3rd_party()

        {

        uv_tcp_t server;

        uv_tcp_init(uv_default_loop(), &server);

        struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);

        uv_tcp_bind(&server, bind_addr);

        printf("In listen Thread\n");

        int r = uv_listen((uv_stream_t*) &server, 111, on_new_connection);

        if (r) {

        fprintf(stderr, "error uv_listen");

        //return 1;

        }


        }



        void main(void) {

        printf( "in sarav \n");

        //buffer_size = 200;

        int key = 7876;

        qid = msgget( key, IPC_CREAT | 0666 );

        if ( qid == -1 )

        {

        printf ( "msgget err, %d\n", errno);

        exit(0);

        }

        uv_tcp_t server;

        uv_tcp_init(uv_default_loop(), &server);

        struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);

        uv_tcp_bind(&server, bind_addr);


        int r = uv_listen((uv_stream_t*) &server, 200, on_new_connection);

        if (r) {

        fprintf(stderr, "error uv_listen");

        //return 1;

        }


        pthread_t gwthrd, rdthrd;

        pthread_create( &gwthrd, NULL,(void*) &recv_from_gw, NULL );

        uv_loop_t* loop = uv_default_loop();

        //uv_run(uv_default_loop(), UV_RUN_DEFAULT);

        uv_run(loop, UV_RUN_DEFAULT);

        //return 0;

        }



Now this is the code that is causing weird behavior.  When we run serial
connection(one connection at a time), the previous uv_write's callback
is not called until new connection is not initiated. The same, when
happening in parallel connections, throws assert at line "
  uv__req_unregister(stream->loop, req);" in file src/unix/stream.c,
line number 877.

Please suggest the solution to this problem.

OS : Fedora Linux 17
gcc : gcc version 4.7.2 20120921 (Red Hat 4.7.2-2) (GCC)
libuv version 0.11.1
code Compilation command : gcc -O0 -g server.c -o server
-I../../libuv/include/ ../../libuv/libuv.a -lrt -ldl -lm -pthread

--
You received this message because you are subscribed to the Google
Groups "libuv" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/libuv.
For more options, visit https://groups.google.com/groups/opt_out.


--
Saúl Ibarra Corretgé
http://bettercallsaghul.com

--
You received this message because you are subscribed to the Google Groups 
"libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/libuv.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to