I'm still debugging the communication issues I've been struggling with for months - where there are unexpected delays in (I think) message passing between a C++ Messaging API application and a C++ broker. I've now narrowed down the problem to the send() call, I believe. I'm using the following function(s) to publish data - notice the timing etc.:

namespace Util {
namespace {
inline double clockTime(clockid_t clockId)
{
  {
    struct timespec t;
    if(clock_gettime(clockId, &t)==0) {
      return t.tv_sec+0.000000001*t.tv_nsec;
    }
    return -INFINITY;
  }
}
}

double currentMonotonicTime()
{
  return clockTime(CLOCK_MONOTONIC);
}

double currentProcessTime()
{
  return clockTime(CLOCK_PROCESS_CPUTIME_ID);
}
} // End namespace Util

#define PUBLISH_SENDER_CAPACITY 100

std::string QPid::publishArgs()
{
  std::string exchangeArgs="create: sender, delete: never";

  exchangeArgs+=", node: { type: topic, durable: False }";

  return exchangeArgs;
}

bool QPid::publish(qpid::messaging::Session &session,
                   const qpid::messaging::Message &message)
{
  double t0=Util::currentMonotonicTime(), t1=t0;
  double t0cpu=Util::currentThreadTime(), t1cpu=t0cpu;
std::string exchangeName=publishExchange();
  try {
    session.checkError();
    qpid::messaging::Sender sender;

    try {
      sender=session.getSender(exchangeName);
    } catch(qpid::messaging::KeyError &error) {
      std::string exchangeArgs=publishArgs();
      std::string address=exchangeName + "; { " + exchangeArgs + " }";
sender=session.createSender(address);
#ifdef PUBLISH_SENDER_CAPACITY
      sender.setCapacity(PUBLISH_SENDER_CAPACITY);
#endif
    }
#ifdef DEBUG
    std::cerr << "Subject " << message.getSubject() << "\n";
#endif /* DEBUG */
    t1=Util::currentMonotonicTime();
    t1cpu=Util::currentThreadTime();
sender.send(message);
  } catch(const std::exception &error) {
    const char *errorMessage=error.what();
if(errorMessage && *errorMessage) {
      Util::error("\"%s\" when sending message to %s/%s", errorMessage,
                  exchangeName.c_str(), message.getSubject().c_str());
    } else {
      /* Note: Exceptions from Session::checkError() often have no error
               message */
      Util::error("Unspecified failure or pre-existing link problem when "
                  "sending to %s/%s", exchangeName.c_str(),
                  message.getSubject().c_str());
    }
return false;
  }
  double t2=Util::currentMonotonicTime(), t2cpu=Util::currentThreadTime();
Util::debug(2, "%s sent to %s in %.2f ms/CPU time %.2f ms (%.2f/%.2f ms in send())",
              message.getSubject().c_str(), exchangeName.c_str(), (t2-t0)*1000,
              (t2cpu-t0cpu)*1000, (t2-t1)*1000, (t2cpu-t1cpu)*1000);
return true;
}

Util::debug() is a simple function that optionally (subject to a "debug level") config sends data to the console; it takes printf()-style input.

Now, most of the time, output from the above with QPid "debug+" logging is something like (some names are changed due to paranoia):

2020-12-09 12:30:00 [Client] debug Sending to exchange Data.exch 
{MessageProperties: content-length=3; content-type=application/x-protobuf; 
user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64; 
application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516990),timestamp_usec:F8:int64(16253),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
 1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000; 
routing-key=cxxx.out.extdata.7; }
Dec  9 12:30:00 cxxx.out.extdata.7 sent to Data.exch in 0.03 ms/CPU time 0.03 
ms (0.03/0.03 ms in send())

but I sometimes get

2020-12-09 12:30:07 [Client] debug Sending to exchange Data.exch 
{MessageProperties: content-length=3; content-type=application/x-protobuf; 
user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64; 
application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516991),timestamp_usec:F8:int64(8403),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
 1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000; 
routing-key=cxxx.out.extdata.7; }
Dec  9 12:30:07 cxxx.out.extdata.7 sent to Data.exch in 7195.39 ms/CPU time 
2.45 ms (7195.39/2.45 ms in send())

My question is simply, what might cause the send to take several seconds? I can't see any reason why the communication itself should be that slow; the software runs on a machine that's connected to the same network switch as the broker. The systems do have a certain load, but I didn't think it would be able to cause anything like this. And shouldn't threading etc. make sure something gets done within a reasonable time in any case? What am I missing?

Note that the Session object is reused - ideally (if there are no errors) it's kept throughout the life-time of the application. The same session is also used to receive data from a number of queues. Is that a bad idea? Send and receive are done in the same thread.

There is nothing in the broker log to suggest that that a problem is detected on that end.

I've experimented with "trace+" logging, but it seemed to produce so much information that it bogged down the system.

So I'm crying for help again... Anyone?

Thanks.

- Toralf

Reply via email to