I suspect there is a bug within ZMQ, that or I'm doing something stupid.
Actually, the latter is probably more likely as I'd like to think that this
would have surfaced elsewhere if it was a bug. Attached is minimal test
program which reproduces the issue on my system. In my test, ZMQ is
statically linked to the test program. This is a Windows program compiled
in Visual Studio 2015. Once running, after roughly 2 mins 40 seconds it
crashes. There's an error message that says "abort() has been called".

Could someone please confirm that they also see the crash, and if I am
doing something stupid please feel free to tell me.

James





On 30 September 2016 at 11:49, James Chapman <[email protected]> wrote:

> Thanks, I'll move to a shared context that persists for the duration of
> the process.
>
> ​Sockets are one per thread, in fact, as the threads are re-used, each
> thread will create many sockets over its lifetime.​
>
> -​James​
>
>
>
> On 30 September 2016 at 11:24, Luca Boccassi <[email protected]>
> wrote:
>
>> You can (and probably should as best practise) reuse the context,
>> which is thread safe.
>>
>> Do not use the same socket from multiple threads. There is a new
>> category of thread-safe sockets in libzmq master but the API is not
>> yet finalised.
>>
>
>
#include <iostream>
#include <string>
#include <sstream>
#include <thread>
#include <chrono>

#include <WinSock2.h>
#include <windows.h> 
#include <stdio.h> 
#include <Shlwapi.h>
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Shlwapi.lib")

#include <zmq.hpp>

#define MSG_SIZE 1024

BOOL CtrlHandler(DWORD fdwCtrlType)
{
    switch (fdwCtrlType)
    {
        // Handle the CTRL-C signal. 
    case CTRL_C_EVENT:
        return(FALSE);

        // CTRL-CLOSE: confirm that the user wants to exit. 
    case CTRL_CLOSE_EVENT:
        return(FALSE);

        // Pass other signals to the next handler. 
    case CTRL_BREAK_EVENT:
        return FALSE;

    case CTRL_LOGOFF_EVENT:
        return FALSE;

    case CTRL_SHUTDOWN_EVENT:
        return FALSE;

    default:
        return FALSE;
    }
}

/**
* SERVER: Bind and listen on tcp://127.0.0.5:9995
*/
void startServer(zmq::context_t * pContext)
{
    auto pZmqSocket = new zmq::socket_t(*pContext, ZMQ_REP);
    pZmqSocket->bind("tcp://127.0.0.5:9995");
    zmq::pollitem_t pollItem = { *pZmqSocket, 0, ZMQ_POLLIN, 0 };
    bool bReceived = false;
    int nRetries = 5;
    while (1)
    {
        zmq::poll(&pollItem, 1, 3 * 1000);
        bReceived = false;

        if (pollItem.revents & ZMQ_POLLIN)
        {
            auto pZmqMsgIn = new zmq::message_t();
            try
            {
                bReceived = pZmqSocket->recv(pZmqMsgIn);
            }
            catch (std::exception& e)
            {
                std::stringstream streamExceptionMsg;
                streamExceptionMsg << "Exception caught during reply message socket.recv()" << " : " << e.what();
                printf("SERVER: %s\n", streamExceptionMsg.str().c_str());
            }
            if (bReceived)
            {
                printf("SERVER: message received\n");
                auto pZmqMsgOut = new zmq::message_t();
                pZmqSocket->send(*pZmqMsgOut);
                delete pZmqMsgOut;
            }
            delete pZmqMsgIn;
        }
        else
        {
            --nRetries;
            if (nRetries == 0)
            {
                std::stringstream ss;
                ss << "Sent message but got no response";
                printf("SERVER: retry timeout\n");
            }
        }
    } // end while
}

/**
* CLIENT: Connect to tcp://127.0.0.5:9995, send message, wait for reply, repeat
*/
void startClient(zmq::context_t * pContext)
{
    while (1)
    {
        auto pZmqSocket = new zmq::socket_t(*pContext, ZMQ_REQ);
        auto pZmqMsgOut = new zmq::message_t(MSG_SIZE);
        pZmqSocket->connect("tcp://127.0.0.5:9995");
        int linger = 1000;
        pZmqSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(int));
        zmq::pollitem_t items[] = { { *pZmqSocket, 0, ZMQ_POLLIN, 0 } };
        char buffer[MSG_SIZE] = { "s" };
        memcpy(pZmqMsgOut->data(), buffer, MSG_SIZE);
        try
        {
            if (pZmqSocket->send(*pZmqMsgOut))
                printf("CLIENT: message sent\n");
        }
        catch (std::exception& e)
        {
            std::stringstream ss;
            ss << "send fail : " << e.what();
            printf("CLIENT: %s\n", ss.str().c_str());
        }
        
        int nRetries = 5;
        bool bReceived = false;
        while (1)
        {
            zmq::poll(&items[0], 1, 3 * 1000);
            bReceived = false;

            if (items[0].revents & ZMQ_POLLIN)
            {
                auto pZmqMsgIn = new zmq::message_t();
                try
                {
                    bReceived = pZmqSocket->recv(pZmqMsgIn);
                }
                catch (std::exception& e)
                {
                    std::stringstream streamExceptionMsg;
                    streamExceptionMsg << "Exception caught during reply message socket.recv()" << " : " << e.what();
                    printf("CLIENT: %s\n", streamExceptionMsg.str().c_str());
                }
                if (bReceived)
                {
                    printf("CLIENT: reply received\n");
                }
                delete pZmqMsgIn;
                break;
            }
            else
            {
                --nRetries;
                if (nRetries == 0)
                {
                    std::stringstream ss;
                    ss << "Sent message but got no response";
                    printf("CLIENT: retry timeout\n");
                    break;
                }
            }
        } // end while
        
        pZmqSocket->disconnect("tcp://127.0.0.5:9995");
        pZmqSocket->close();
        delete pZmqMsgOut;
        delete pZmqSocket;
    }
}

int main()
{
    zmq::context_t * pContext = new zmq::context_t(2);
    std::thread t1 = std::thread(startServer, pContext);
    std::thread t2 = std::thread(startClient, pContext);

    if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE))
    {
        while (1)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
    }

    return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to