Just in case someone finds time to look into my issue, I am attaching the publisher code. Perhaps there is something I am doing wrong or options i need to set to achieve a fairly-queued sending of events.

On 01/24/2012 06:27 PM, Joegen Baclor wrote:
Hi,

We have an application the uses PUP/SUB sockets. One requirement that ZeroMQ gracefully meets is being able to reliably send events to all subscribers. However, there is a requirement in our application that relies on which client gets the event first. We were under the assumption that ZeroMQ randomizes the sending of events to the mutiple subscribers so that no one subscriber always gets a first hit. Unfortunately, this seems to be not the case. ZeroMq seems to follow a specific fix order. We have created 3 clients each incrementing a counter if it was the first one to get the publication. Take the output below (each iteration is 1500 events):

Iteration 0
Client 1 processed 1496 events.
Client 2 processed 0 events.
Client 3 processed 4 events.

Iteration 1
Client 1 processed 2309 events.
Client 2 processed 647 events.
Client 3 processed 44 events.

Iteration 2
Client 1 processed 2992 events.
Client 2 processed 1385 events.
Client 3 processed 123 events.

Iteration 3
Client 1 processed 3623 events.
Client 2 processed 1951 events.
Client 3 processed 426 events.

Iteration 4
Client 1 processed 4207 events.
Client 2 processed 2697 events.
Client 3 processed 723 events.
TestDriver::TestMultiplePop ...    Ok

Clearly, the order is predictable. client 1 gets the highest hit and client 3 gets the lowest hit. Is there a way so that we can spread this evenly across the subscribers?

Joegen




#include "StateQueuePublisher.h"
#include "zmq.hpp"
#include "OsLogger.h"

//  Convert string to 0MQ string and send to socket
static bool
s_send (zmq::socket_t & socket, const std::string & string) {

    zmq::message_t message(string.size());
    memcpy(message.data(), string.data(), string.size());

    bool rc = socket.send(message);
    return (rc);
}

//  Sends string as 0MQ string, as multipart non-terminal
static bool
s_sendmore (zmq::socket_t & socket, const std::string & string) {

    zmq::message_t message(string.size());
    memcpy(message.data(), string.data(), string.size());

    bool rc = socket.send(message, ZMQ_SNDMORE);
    return (rc);
}

StateQueuePublisher::StateQueuePublisher() :
  _queue(50),
  _pThread(0),
  _terminate(false)
{
  OS_LOG_INFO(FAC_NET, "StateQueuePublisher CREATED.");
}

StateQueuePublisher::~StateQueuePublisher()
{
  stop();
  OS_LOG_INFO(FAC_NET, "StateQueuePublisher DESTROYED.");
}

void StateQueuePublisher::stop()
{
  _terminate = true;
  if (_pThread && _pThread->joinable())
  {
    StateQueueRecord record;
    record.id = "~StateQueuePublisher";
    _queue.enqueue(record);
    _pThread->join();
    delete _pThread;
    _pThread = 0;
  }
}

bool StateQueuePublisher::run()
{
  if (_pThread || _zmqBindAddress.empty())
    return false;
  _pThread = new boost::thread(boost::bind(&StateQueuePublisher::internal_run, this));
  return true;
}

void StateQueuePublisher::publish(const StateQueueRecord& record)
{
  _queue.enqueue(record);
}

void StateQueuePublisher::internal_run()
{
  zmq::context_t context(1);
  zmq::socket_t socket(context, ZMQ_PUB);

  try
  {
    socket.bind(_zmqBindAddress.c_str());
  }
  catch(zmq::error_t& error_)
  {
    return;
  }

  OS_LOG_INFO(FAC_NET, "StateQueuePublisher::internal_run() "
          << "Started accepting subscriptions at " << _zmqBindAddress);
  
  while(!_terminate)
  {
    StateQueueRecord record;
    if (_queue.dequeue(record))
    {
      //
      // exit
      //
      if (_terminate)
        break;

      //
      // publish
      //
      std::string eventId = record.id;

      try
      {
        s_sendmore(socket, eventId);

        std::string exclude;
        for (std::vector<std::string>::const_iterator iter = record.exclude.begin();
                iter != record.exclude.end(); iter++)
        {
          exclude += *iter;
          exclude += " ";
        }

        if (exclude.empty())
          exclude = "initial_data";

        OS_LOG_DEBUG(FAC_NET, "StateQueuePublisher::publish "
                << " message-id: " << eventId
                << " exclude-app-id: " << exclude);

        s_sendmore(socket, _zmqBindAddress);
        s_send(socket, exclude);
      }
      catch(zmq::error_t& error_)
      {
      }
    }
  }
  OS_LOG_INFO(FAC_NET, "StateQueuePublisher::internal_run() TERMINATED.");
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to