Is it possibly a nagle problem? We always set "tcp_nodelay: true" in our
configs... We also re-use sessions throughout the application lifetime.

On Wed, Dec 9, 2020 at 8:36 AM Toralf Lund <toralf.l...@pgs.com> wrote:

> I'm still debugging the communication issues I've been struggling with
> for months - where there are unexpected delays in (I think) message
> passing between a C++ Messaging API application and a C++ broker. I've
> now narrowed down the problem to the send() call, I believe. I'm using
> the following function(s) to publish data - notice the timing etc.:
>
> namespace Util {
> namespace {
> inline double clockTime(clockid_t clockId)
> {
>    {
>      struct timespec t;
>      if(clock_gettime(clockId, &t)==0) {
>        return t.tv_sec+0.000000001*t.tv_nsec;
>      }
>      return -INFINITY;
>    }
> }
> }
>
> double currentMonotonicTime()
> {
>    return clockTime(CLOCK_MONOTONIC);
> }
>
> double currentProcessTime()
> {
>    return clockTime(CLOCK_PROCESS_CPUTIME_ID);
> }
> } // End namespace Util
>
> #define PUBLISH_SENDER_CAPACITY 100
>
> std::string QPid::publishArgs()
> {
>    std::string exchangeArgs="create: sender, delete: never";
>
>    exchangeArgs+=", node: { type: topic, durable: False }";
>
>    return exchangeArgs;
> }
>
> bool QPid::publish(qpid::messaging::Session &session,
>                    const qpid::messaging::Message &message)
> {
>    double t0=Util::currentMonotonicTime(), t1=t0;
>    double t0cpu=Util::currentThreadTime(), t1cpu=t0cpu;
>
>    std::string exchangeName=publishExchange();
>    try {
>      session.checkError();
>      qpid::messaging::Sender sender;
>
>      try {
>        sender=session.getSender(exchangeName);
>      } catch(qpid::messaging::KeyError &error) {
>        std::string exchangeArgs=publishArgs();
>        std::string address=exchangeName + "; { " + exchangeArgs + " }";
>
>        sender=session.createSender(address);
> #ifdef PUBLISH_SENDER_CAPACITY
>        sender.setCapacity(PUBLISH_SENDER_CAPACITY);
> #endif
>      }
>
> #ifdef DEBUG
>      std::cerr << "Subject " << message.getSubject() << "\n";
> #endif /* DEBUG */
>      t1=Util::currentMonotonicTime();
>      t1cpu=Util::currentThreadTime();
>
>      sender.send(message);
>    } catch(const std::exception &error) {
>      const char *errorMessage=error.what();
>
>      if(errorMessage && *errorMessage) {
>        Util::error("\"%s\" when sending message to %s/%s", errorMessage,
>                    exchangeName.c_str(), message.getSubject().c_str());
>      } else {
>        /* Note: Exceptions from Session::checkError() often have no error
>                 message */
>        Util::error("Unspecified failure or pre-existing link problem when "
>                    "sending to %s/%s", exchangeName.c_str(),
>                    message.getSubject().c_str());
>      }
>
>      return false;
>    }
>    double t2=Util::currentMonotonicTime(), t2cpu=Util::currentThreadTime();
>
>    Util::debug(2, "%s sent to %s in %.2f ms/CPU time %.2f ms (%.2f/%.2f ms
> in send())",
>               message.getSubject().c_str(), exchangeName.c_str(),
> (t2-t0)*1000,
>               (t2cpu-t0cpu)*1000, (t2-t1)*1000, (t2cpu-t1cpu)*1000);
>
>    return true;
> }
>
> Util::debug() is a simple function that optionally (subject to a "debug
> level") config sends data to the console; it takes printf()-style input.
>
> Now, most of the time, output from the above with QPid "debug+" logging
> is something like (some names are changed due to paranoia):
>
> 2020-12-09 12:30:00 [Client] debug Sending to exchange Data.exch
> {MessageProperties: content-length=3; content-type=application/x-protobuf;
> user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64;
> application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516990),timestamp_usec:F8:int64(16253),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
> 1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000;
> routing-key=cxxx.out.extdata.7; }
> Dec  9 12:30:00 cxxx.out.extdata.7 sent to Data.exch in 0.03 ms/CPU time
> 0.03 ms (0.03/0.03 ms in send())
>
> but I sometimes get
>
> 2020-12-09 12:30:07 [Client] debug Sending to exchange Data.exch
> {MessageProperties: content-length=3; content-type=application/x-protobuf;
> user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64;
> application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516991),timestamp_usec:F8:int64(8403),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
> 1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000;
> routing-key=cxxx.out.extdata.7; }
> Dec  9 12:30:07 cxxx.out.extdata.7 sent to Data.exch in 7195.39 ms/CPU
> time 2.45 ms (7195.39/2.45 ms in send())
>
> My question is simply, what might cause the send to take several
> seconds? I can't see any reason why the communication itself should be
> that slow; the software runs on a machine that's connected to the same
> network switch as the broker. The systems do have a certain load, but I
> didn't think it would be able to cause anything like this. And shouldn't
> threading etc. make sure something gets done within a reasonable time in
> any case? What am I missing?
>
> Note that the Session object is reused - ideally (if there are no
> errors) it's kept throughout the life-time of the application. The same
> session is also used to receive data from a number of queues. Is that a
> bad idea? Send and receive are done in the same thread.
>
> There is nothing in the broker log to suggest that that a problem is
> detected on that end.
>
> I've experimented with "trace+" logging, but it seemed to produce so
> much information that it bogged down the system.
>
> So I'm crying for help again... Anyone?
>
> Thanks.
>
> - Toralf
>
>

Reply via email to