>> I'm testing out ZMQ_P2P (latest from git), one end bind/receiving
>> another end connect/sending.
>>
>> If the sender dies and reconnects, the receiving end
>> crashes with:
>>
>> Assertion failed: !inpipe && !outpipe (p2p.cpp:47)

> Can you attach your test program? I'll have a look.

Attached is zmq_client.c and zmq_server.c  , fire up the
zmq_server, then the client - Ctrl+C the client and start
it again will cause the server to assert.
#include <zmq.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <stdio.h>

int main(int argc,char *argv[])
{
   void *z_handle = zmq_init(1,1,0); 
   assert(z_handle != NULL);

   void *socket_handle = zmq_socket(z_handle,ZMQ_P2P);
   //void *socket_handle = zmq_socket(z_handle,ZMQ_PUB);
   assert(socket_handle != NULL);

    
    int64_t hwm = 50;
    if(zmq_setsockopt(socket_handle,ZMQ_HWM,&hwm,sizeof hwm) != 0) {
        perror("zmq_setsockopt hwm");
        return 1;
    }
    int64_t lwm = 10;
    if(zmq_setsockopt(socket_handle,ZMQ_LWM,&lwm,sizeof lwm) != 0) {
        perror("zmq_setsockopt lwm");
        return 1;
    }
    if(zmq_connect(socket_handle,"tcp://192.168.1.20:11111") != 0) {
        perror("zmq_bind");
        return 1;
    }

    for(int i = 0; i < 50000 ; i++) {
    zmq_msg_t msg;
    char *data = "Hello World";
    if(zmq_msg_init_data(&msg,data,strlen(data) + 1,NULL,NULL) != 0) {
        perror("zmq_msg_init_data");
        return 2;
    }
    if(zmq_send(socket_handle,&msg,0) != 0) {
        perror("zmq_send");
        return 3;
    }
    if(i%5 == 0 ) {
        printf("%d",i);
    } else {
        printf(".");
    }
    fflush(stdout);
    if(i % 500 == 0) {
        sleep(2);
    }
    }
    fputs("Hit enter to continue ...",stdout);
    fflush(stdout);
    getchar();

    int c_ok = zmq_close(socket_handle);
    assert(c_ok == 0);

    int t_ok = zmq_term(z_handle);
    assert(t_ok == 0);
    return 0;

}
#include <zmq.h>
#include <stdint.h>
#include <assert.h>
#include <errno.h>
#include <stdio.h>


int handle(void *socket_handle)
{
    zmq_msg_t msg;
    if(zmq_msg_init(&msg) != 0) {
        perror("zmq_msg_init");
        return 2;
    }
    if(zmq_recv(socket_handle,&msg,0) != 0) {
        perror("zmq_recv");
        return 3;
    }
    char *data = zmq_msg_data(&msg);
    printf("Received: %s\n",data);
    zmq_msg_close(&msg);
    return 0;
}

int main(int argc,char *argv[])
{
   void *z_handle = zmq_init(2,2,0); 
   assert(z_handle != NULL);

   void *socket_handle = zmq_socket(z_handle,ZMQ_P2P);
   //void *socket_handle = zmq_socket(z_handle,ZMQ_SUB);
   assert(socket_handle != NULL);

    /*char *msg_filter = "";
    if(zmq_setsockopt(socket_handle,ZMQ_SUBSCRIBE,msg_filter,0) != 0) { //all 
messages
        perror("zmq_setsockopt subscribe");
        return 1;
    }*/
    if(zmq_bind(socket_handle,"tcp://*:11111") != 0) {
        perror("zmq_bind");
        return 1;
    }
    
    int64_t hwm = 50;
    if(zmq_setsockopt(socket_handle,ZMQ_HWM,&hwm,sizeof hwm) != 0) {
        perror("zmq_setsockopt hwm");
        return 1;
    }
    int64_t lwm = 10;
    if(zmq_setsockopt(socket_handle,ZMQ_LWM,&lwm,sizeof lwm) != 0) {
        perror("zmq_setsockopt lwm");
        return 1;
    }
    for(;;) {
        if(handle(socket_handle) != 0) {
            break;
        }
    }

    int c_ok = zmq_close(socket_handle);
    assert(c_ok == 0);

    int t_ok = zmq_term(z_handle);
    assert(t_ok == 0);
    return 0;

}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to