Code of my classes attached.
A main would look like this

int main (void)
{
    Zmqcpp::Context mycontext(1);
    Zmqcpp::Publisher mypubber(mycontext, "tcp://*:5678", ZMQCPP_BIND);
Zmqcpp::Subscriber mysubber(mycontext, "tcp://localhost:5678", ZMQCPP_CONNECT);
    mysubber.SubscribeTopic("B");

while(1)
    {

        char mystr[3];
        sprintf(mystr,"%03d",counter);
        mypubber.PubMessage(3,"A","We don't want to see this",mystr);
        mypubber.PubMessage(3,"B","We would like to see this",mystr);
        printf("Cycle [%s]\n",mystr);
        usleep(100);
        for (int i=0;i<3;i++)
            cout << mysubber.receive() << std::endl;

        counter++;

        usleep(1500000);
    }
    mypubber.~Publisher();
    mysubber.~Subscriber();
    mycontext.~Context();
    return (EXIT_SUCCESS);
}


Claudio


On 12/22/2012 02:14 PM, Claudio Carbone wrote:
To make debugging an application easier I often joined the server and client in a single program.

Now I'm using my own c++ classes built on top of the c bindings.
This means I have a class instance for every object:
Context,socket,publisher and subscriber.

When used like this I find that somehow multiple threads are spawned automatically effectively nullifying having both sides in a single application.

In fact unless I start putting in callbacks,output redirection and other stuff, having multiple threads makes my life a lot harder:gdb starts having problems following the code (sometimes problems seem to happen nowhere or just in disassembly), simple stdout doesn't work, and so on.

Now unless I forgot something (quite plausible) ZMQ should create threads just on context creation.
Why am I seeing multiple threads with a single context?

Being on the phone I don't have the code, will post it later.

Thank you
Claudio
-- Sent from my ParanoidAndroid Galaxy Nexus with K-9 Mail.


/* 
 * File:   zmqcpp.cpp
 * Author: erupter
 * 
 * Created on December 14, 2012, 6:32 PM
 */

#include <zmq.h>

#include "zmqcpp.h"
namespace Zmqcpp {

    
    Context::Context()
    {
        this->_zmq_context_ptr = zmq_init(1);
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();      
    }
    
    Context::Context(int threads)
    {
        this->_zmq_context_ptr = zmq_init(threads);
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();           
    }
    
    Context::~Context()
    {
        zmq_ctx_destroy(this->_zmq_context_ptr);
    }
    
    Context* Context::getPtr()
    {
        return this;
    }
    
    Socket::Socket(Context* context, int skt_type)
    {
        this->_zmq_socket_ptr = 0;
        this->_zmq_socket_ptr = zmq_socket(context->_zmq_context_ptr, skt_type);    
        if (this->_zmq_socket_ptr == NULL)
            throw error_t ();
    }
    
    Socket::Socket(Context* context, int skt_type, std::string ip_addr, int conn)
    {
        this->_zmq_socket_ptr = 0;
        this->_zmq_socket_ptr = zmq_socket(context->_zmq_context_ptr, skt_type);    
        if (this->_zmq_socket_ptr == NULL)
            throw error_t ();
        if (conn == ZMQCPP_BIND)        
            zmq_bind(this->_zmq_socket_ptr, ip_addr.c_str());
        else
        {
            if (conn == ZMQCPP_CONNECT)
                zmq_connect(this->_zmq_socket_ptr, ip_addr.c_str());
            else
                throw error_t();
        }
    }
    
    Socket::~Socket()
    {
        zmq_close(this->_zmq_socket_ptr);
        //zmq_disconnect(this->_zmq_socket_ptr, this->_ip_addr);
    }
    
    Socket* Socket::getPtr()
    {
        return this;
    }
        
    int Socket::setsockopt(int option, const void *optval,size_t optvallen)
    {
        return zmq_setsockopt(this->_zmq_socket_ptr,option, optval,optvallen);        
    }

    int Socket::Send(std::string msg)
    {
        return s_send(this->_zmq_socket_ptr,msg.c_str());
    }
    
    
    int Socket::SendMore(std::string msg)
    {
        return s_sendmore(this->_zmq_socket_ptr,msg.c_str());
    }
    
    int Socket::connect()
    {
        int rc = zmq_connect (this->_zmq_socket_ptr, "tcp://localhost:5678");
        if (rc != 0)
            throw error_t ();
        this->_ip_addr = "tcp://localhost:5678";
        return 0;
    }
    
    int Socket::connect(std::string ip_addr)
    {
        int rc = zmq_connect (this->_zmq_socket_ptr, ip_addr.c_str());
        if (rc != 0)
            throw error_t ();  
        this->_ip_addr = ip_addr;
        return 0;
    }    
    
    int Socket::bind()
    {
        int rc = zmq_bind (this->_zmq_socket_ptr, "tcp://*:5678");
        if (rc != 0)
            throw error_t ();           
        this->_ip_addr = "tcp://*:5678";
        return 0;
    }
           
    int Socket::bind(std::string ip_addr)
    {
        int rc = zmq_bind (this->_zmq_socket_ptr, ip_addr.c_str());
        if (rc != 0)
            throw error_t ();      
        this->_ip_addr = ip_addr;
        return 0;
    }

    char* Socket::Receive()
    {
        return s_recv (this->_zmq_socket_ptr);
    }
    
    void generic::classinit()
    {
        this->_counter=0;
        this->_internal_context=0;
        this->_zmq_context_ptr=0;
        this->_zmq_socket_ptr=0;
    }
    
    void generic::contextcreate()
    {
        this->_zmq_context_ptr = zmq_init(1);
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();      
        this->_internal_context = 1;     
    }
    void generic::contextcreate(void* context)
    {
        this->_zmq_context_ptr = context;
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();          
        this->_internal_context = 0;        
    }
    void generic::contextcreate(Context context)
    {
//        this->_zmq_context_ptr = context._zmq_context_ptr;
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();          
        this->_internal_context = 0;
    }


    void generic::socketcreate(int skt_type)
    {
        this->_zmq_socket_ptr = zmq_socket(this->_zmq_context_ptr, skt_type);    
        if (this->_zmq_socket_ptr == NULL)
            throw error_t ();    
    }

    void generic::socketbind()
    {
    int rc = zmq_bind (this->_zmq_socket_ptr, "tcp://*:5678");
        if (rc != 0)
            throw error_t ();    
    }

    void generic::socketbind(char* ip_addr)
    {
    int rc = zmq_bind (this->_zmq_socket_ptr, ip_addr);
        if (rc != 0)
            throw error_t ();    
    }
    void generic::socketconn()
    {
        int rc = zmq_connect (this->_zmq_socket_ptr, "tcp://localhost:5678");
        if (rc != 0)
            throw error_t ();
    }
    void generic::socketconn(char* ip_addr)
    {
        int rc = zmq_connect (this->_zmq_socket_ptr, ip_addr);
        if (rc != 0)
            throw error_t ();    
    }

    void generic::close()
    {
        zmq_close(this->_zmq_socket_ptr);
        if (this->_internal_context)
            zmq_ctx_destroy(this->_zmq_context_ptr);      
    }


    int generic2::Bind()
    {
        return zmq_bind(this->_socket,"tcp://*:5678");
    }
    
    int generic2::Bind(std::string ip_addr)
    {
        return zmq_bind(this->_socket,"epgm://192.168.127.253;239.192.1.1:5678");
    }
    
    int generic2::Connect()
    {
        return zmq_connect(this->_socket,"tcp://*:5678");
    }
    
    int generic2::Connect(std::string ip_addr)
    {
        return zmq_connect(this->_socket,"epgm://192.168.127.253;239.192.1.1:5678");
    }
    /*Publisher::Publisher(void * context)
    {
        this->_zmq_context_ptr=context;
        this->_socket = new Socket(context,ZMQ_PUB);
        this->_counter=0;
    }
    Publisher::Publisher(void* context, char* ip_str)
    {
        this->_zmq_context_ptr=context;
        this->_socket = new Socket(context,ZMQ_PUB, ip_str);
        this->_counter=0;        
    }*/
    Publisher::Publisher(Context* context)
    {
        this->_counter=0;
        this->_socket=0;
        this->_zmq_context=0;
        this->_zmq_context=context;
        this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_PUB);
        this->_socket->bind();
    }
    Publisher::Publisher(Context* context, std::string ip_str, int conn)
    {
        this->_counter=0;
        this->_socket=0;
        this->_zmq_context=0;
        this->_zmq_context=context;
        this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_PUB, ip_str, conn);
        
    }
    Publisher::~Publisher()
    {
        this->_socket->~Socket();
    }

    void Publisher::PubMessage (int count, ...)
    {
        va_list argptr;

        va_start( argptr, count );      

        for( ; count > 1; count-- ) {
            //char *mystr = va_arg(argptr, char*);
            this->_socket->SendMore (va_arg(argptr, char*));
        }
        this->_socket->Send(va_arg(argptr, char*));

        va_end( argptr );          
    }

    void Publisher::worker()
    {/*
        char _count[3];
            sprintf(_count,"%03d",this->_counter);
            // Write two messages, each with an envelope and content
            s_sendmore (this->_zmq_socket_ptr, "A");
            s_sendmore (this->_zmq_socket_ptr, "We don't want to see this");
            s_send     (this->_zmq_socket_ptr, _count);
            s_sendmore (this->_zmq_socket_ptr, "B");
            s_sendmore (this->_zmq_socket_ptr, "We would like to see this");
            s_send     (this->_zmq_socket_ptr, _count);
            this->_counter++;*/
    }
    

    Subscriber::Subscriber(Context* context)
    {
        this->_socket=0;
        this->_zmq_context=0;
        this->_zmq_context=context;
        this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_SUB);
    }

    Subscriber::Subscriber(Context* context, std::string ip_str, int conn)
    {
        this->_socket=0;
        this->_zmq_context=0;
        this->_zmq_context=context;
        this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_SUB, ip_str, conn);
    }

    Subscriber::~Subscriber()
    {
        //Subscriber::close();
    }

    int Subscriber::SubscribeTopic(std::string topic)
    {
        return (this->_socket->setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.length()));    
    }

    std::string Subscriber::worker()
    {
        // Read envelope with address
        std::string address = this->_socket->Receive();
        // Read message contents
        std::string contents = this->_socket->Receive();

        std::string termination = this->_socket->Receive();

        std::stringstream ss;
        ss << "[" << address << "] " << contents << "  | Termination [" << termination<< "]" << std::endl;
        return (ss.str());    
    }
    
    std::string Subscriber::receive()
    {
        return this->_socket->Receive();
    }


}
/* 
 * File:   zmqcpp.h
 * Author: erupter
 *
 * Created on December 14, 2012, 6:32 PM
 */

#ifndef ZMQCPP_H
#define	ZMQCPP_H
extern "C" {
    #include <zmq.h>
    #include <zhelpers.h>
}
#include <string>
#include <sstream>
#include <iostream>
#define ZMQCPP_BIND 0
#define ZMQCPP_CONNECT 1

namespace Zmqcpp
{

  
    class Context
    {
        public:
            Context();
            Context(int threads);
            virtual ~Context();
            Context* getPtr();
            
        protected:
            friend class Socket;
            void* _zmq_context_ptr;
            
    };
    
    class Socket
    {
        public:
            Socket(Context* context, int skt_type);
            Socket(Context* context, int skt_type, std::string ip_str, int conn);
            virtual ~Socket();
            Socket* getPtr();
            int setsockopt(int option, const void *optval,size_t optvallen);
            int connect();
            int connect(std::string ip_addr);
            int bind();
            int bind(std::string ip_addr);
            int SendMore(std::string msg);
            int Send(std::string msg);
            char* Receive();
            
        protected:
            void *_zmq_socket_ptr;
            std::string _ip_addr;
    };
    
    class generic
    {
        protected:
            Context _zmq_context;
            void *_zmq_context_ptr;
            void *_zmq_socket_ptr;
            int _internal_context;
            int _counter;
            inline void classinit();
            inline void contextcreate();
            inline void contextcreate(void* context);
            inline void contextcreate(Context context);
            inline void socketcreate(int skt_type);
            inline void socketbind();
            inline void socketbind(char* ip_addr);
            inline void socketconn();
            inline void socketconn(char* ip_addr);
            inline void close();
            
            
        
        
    };
    
    class generic2
    {
        public:
            int Bind();
            int Bind(std::string ip_addr);
            int Connect();
            int Connect(std::string ip_addr);
        protected:
            Zmqcpp::Context* _zmq_context;
            Zmqcpp::Socket* _socket;
    };
    
    class Publisher: public generic2
    {
        
        public:
            //Publisher(void* context);
            //Publisher(void* context, char* ip_str);
            Publisher(Context* context);
            Publisher(Context* context, std::string ip_str, int conn);
            virtual ~Publisher();
            void PubMessage(int count, ...);
            void worker();
        
        private:
            unsigned int _counter;
    };
    
    class Subscriber: public generic2
    {
        public:
            Subscriber(Context* context);
            Subscriber(Context* context, std::string ip_str, int conn);
            virtual ~Subscriber();
            int SubscribeTopic(std::string topic);
            std::string worker ();
            std::string receive();

    };
    
    class Request: public generic
    {
        public:
            Request(Context* context);
            Request(Context* context, std::string ip_str, int conn);
            virtual ~Request();

    };
    
    class Reply: public generic
    {
        public:
            Reply(Context* context);
            Reply(Context* context, char* ip_str, int conn);
            virtual ~Reply();
    };

}


#endif	/* ZMQCPP_H */

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to