On 09/12/2020 17:31, Chester wrote:
Is it possibly a nagle problem? We always set "tcp_nodelay: true" in our
configs...

That's also done in this application. Or at least it's supposed to be. Perhaps worth checking if the option actually gets enabled the right way...

  We also re-use sessions throughout the application lifetime

OK. Good.

- Toralf

.

On Wed, Dec 9, 2020 at 8:36 AM Toralf Lund <toralf.l...@pgs.com> wrote:

I'm still debugging the communication issues I've been struggling with
for months - where there are unexpected delays in (I think) message
passing between a C++ Messaging API application and a C++ broker. I've
now narrowed down the problem to the send() call, I believe. I'm using
the following function(s) to publish data - notice the timing etc.:

namespace Util {
namespace {
inline double clockTime(clockid_t clockId)
{
    {
      struct timespec t;
      if(clock_gettime(clockId, &t)==0) {
        return t.tv_sec+0.000000001*t.tv_nsec;
      }
      return -INFINITY;
    }
}
}

double currentMonotonicTime()
{
    return clockTime(CLOCK_MONOTONIC);
}

double currentProcessTime()
{
    return clockTime(CLOCK_PROCESS_CPUTIME_ID);
}
} // End namespace Util

#define PUBLISH_SENDER_CAPACITY 100

std::string QPid::publishArgs()
{
    std::string exchangeArgs="create: sender, delete: never";

    exchangeArgs+=", node: { type: topic, durable: False }";

    return exchangeArgs;
}

bool QPid::publish(qpid::messaging::Session &session,
                    const qpid::messaging::Message &message)
{
    double t0=Util::currentMonotonicTime(), t1=t0;
    double t0cpu=Util::currentThreadTime(), t1cpu=t0cpu;

    std::string exchangeName=publishExchange();
    try {
      session.checkError();
      qpid::messaging::Sender sender;

      try {
        sender=session.getSender(exchangeName);
      } catch(qpid::messaging::KeyError &error) {
        std::string exchangeArgs=publishArgs();
        std::string address=exchangeName + "; { " + exchangeArgs + " }";

        sender=session.createSender(address);
#ifdef PUBLISH_SENDER_CAPACITY
        sender.setCapacity(PUBLISH_SENDER_CAPACITY);
#endif
      }

#ifdef DEBUG
      std::cerr << "Subject " << message.getSubject() << "\n";
#endif /* DEBUG */
      t1=Util::currentMonotonicTime();
      t1cpu=Util::currentThreadTime();

      sender.send(message);
    } catch(const std::exception &error) {
      const char *errorMessage=error.what();

      if(errorMessage && *errorMessage) {
        Util::error("\"%s\" when sending message to %s/%s", errorMessage,
                    exchangeName.c_str(), message.getSubject().c_str());
      } else {
        /* Note: Exceptions from Session::checkError() often have no error
                 message */
        Util::error("Unspecified failure or pre-existing link problem when "
                    "sending to %s/%s", exchangeName.c_str(),
                    message.getSubject().c_str());
      }

      return false;
    }
    double t2=Util::currentMonotonicTime(), t2cpu=Util::currentThreadTime();

    Util::debug(2, "%s sent to %s in %.2f ms/CPU time %.2f ms (%.2f/%.2f ms
in send())",
               message.getSubject().c_str(), exchangeName.c_str(),
(t2-t0)*1000,
               (t2cpu-t0cpu)*1000, (t2-t1)*1000, (t2cpu-t1cpu)*1000);

    return true;
}

Util::debug() is a simple function that optionally (subject to a "debug
level") config sends data to the console; it takes printf()-style input.

Now, most of the time, output from the above with QPid "debug+" logging
is something like (some names are changed due to paranoia):

2020-12-09 12:30:00 [Client] debug Sending to exchange Data.exch
{MessageProperties: content-length=3; content-type=application/x-protobuf;
user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64;
application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516990),timestamp_usec:F8:int64(16253),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000;
routing-key=cxxx.out.extdata.7; }
Dec  9 12:30:00 cxxx.out.extdata.7 sent to Data.exch in 0.03 ms/CPU time
0.03 ms (0.03/0.03 ms in send())

but I sometimes get

2020-12-09 12:30:07 [Client] debug Sending to exchange Data.exch
{MessageProperties: content-length=3; content-type=application/x-protobuf;
user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64;
application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516991),timestamp_usec:F8:int64(8403),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000;
routing-key=cxxx.out.extdata.7; }
Dec  9 12:30:07 cxxx.out.extdata.7 sent to Data.exch in 7195.39 ms/CPU
time 2.45 ms (7195.39/2.45 ms in send())

My question is simply, what might cause the send to take several
seconds? I can't see any reason why the communication itself should be
that slow; the software runs on a machine that's connected to the same
network switch as the broker. The systems do have a certain load, but I
didn't think it would be able to cause anything like this. And shouldn't
threading etc. make sure something gets done within a reasonable time in
any case? What am I missing?

Note that the Session object is reused - ideally (if there are no
errors) it's kept throughout the life-time of the application. The same
session is also used to receive data from a number of queues. Is that a
bad idea? Send and receive are done in the same thread.

There is nothing in the broker log to suggest that that a problem is
detected on that end.

I've experimented with "trace+" logging, but it seemed to produce so
much information that it bogged down the system.

So I'm crying for help again... Anyone?

Thanks.

- Toralf




---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to