Is it possibly a nagle problem? We always set "tcp_nodelay: true" in our configs... We also re-use sessions throughout the application lifetime.
On Wed, Dec 9, 2020 at 8:36 AM Toralf Lund <toralf.l...@pgs.com> wrote: > I'm still debugging the communication issues I've been struggling with > for months - where there are unexpected delays in (I think) message > passing between a C++ Messaging API application and a C++ broker. I've > now narrowed down the problem to the send() call, I believe. I'm using > the following function(s) to publish data - notice the timing etc.: > > namespace Util { > namespace { > inline double clockTime(clockid_t clockId) > { > { > struct timespec t; > if(clock_gettime(clockId, &t)==0) { > return t.tv_sec+0.000000001*t.tv_nsec; > } > return -INFINITY; > } > } > } > > double currentMonotonicTime() > { > return clockTime(CLOCK_MONOTONIC); > } > > double currentProcessTime() > { > return clockTime(CLOCK_PROCESS_CPUTIME_ID); > } > } // End namespace Util > > #define PUBLISH_SENDER_CAPACITY 100 > > std::string QPid::publishArgs() > { > std::string exchangeArgs="create: sender, delete: never"; > > exchangeArgs+=", node: { type: topic, durable: False }"; > > return exchangeArgs; > } > > bool QPid::publish(qpid::messaging::Session &session, > const qpid::messaging::Message &message) > { > double t0=Util::currentMonotonicTime(), t1=t0; > double t0cpu=Util::currentThreadTime(), t1cpu=t0cpu; > > std::string exchangeName=publishExchange(); > try { > session.checkError(); > qpid::messaging::Sender sender; > > try { > sender=session.getSender(exchangeName); > } catch(qpid::messaging::KeyError &error) { > std::string exchangeArgs=publishArgs(); > std::string address=exchangeName + "; { " + exchangeArgs + " }"; > > sender=session.createSender(address); > #ifdef PUBLISH_SENDER_CAPACITY > sender.setCapacity(PUBLISH_SENDER_CAPACITY); > #endif > } > > #ifdef DEBUG > std::cerr << "Subject " << message.getSubject() << "\n"; > #endif /* DEBUG */ > t1=Util::currentMonotonicTime(); > t1cpu=Util::currentThreadTime(); > > sender.send(message); > } catch(const std::exception &error) { > const char *errorMessage=error.what(); > > if(errorMessage && *errorMessage) { > Util::error("\"%s\" when sending message to %s/%s", errorMessage, > exchangeName.c_str(), message.getSubject().c_str()); > } else { > /* Note: Exceptions from Session::checkError() often have no error > message */ > Util::error("Unspecified failure or pre-existing link problem when " > "sending to %s/%s", exchangeName.c_str(), > message.getSubject().c_str()); > } > > return false; > } > double t2=Util::currentMonotonicTime(), t2cpu=Util::currentThreadTime(); > > Util::debug(2, "%s sent to %s in %.2f ms/CPU time %.2f ms (%.2f/%.2f ms > in send())", > message.getSubject().c_str(), exchangeName.c_str(), > (t2-t0)*1000, > (t2cpu-t0cpu)*1000, (t2-t1)*1000, (t2cpu-t1cpu)*1000); > > return true; > } > > Util::debug() is a simple function that optionally (subject to a "debug > level") config sends data to the console; it takes printf()-style input. > > Now, most of the time, output from the above with QPid "debug+" logging > is something like (some names are changed due to paranoia): > > 2020-12-09 12:30:00 [Client] debug Sending to exchange Data.exch > {MessageProperties: content-length=3; content-type=application/x-protobuf; > user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64; > application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516990),timestamp_usec:F8:int64(16253),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram > 1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000; > routing-key=cxxx.out.extdata.7; } > Dec 9 12:30:00 cxxx.out.extdata.7 sent to Data.exch in 0.03 ms/CPU time > 0.03 ms (0.03/0.03 ms in send()) > > but I sometimes get > > 2020-12-09 12:30:07 [Client] debug Sending to exchange Data.exch > {MessageProperties: content-length=3; content-type=application/x-protobuf; > user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64; > application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516991),timestamp_usec:F8:int64(8403),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram > 1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000; > routing-key=cxxx.out.extdata.7; } > Dec 9 12:30:07 cxxx.out.extdata.7 sent to Data.exch in 7195.39 ms/CPU > time 2.45 ms (7195.39/2.45 ms in send()) > > My question is simply, what might cause the send to take several > seconds? I can't see any reason why the communication itself should be > that slow; the software runs on a machine that's connected to the same > network switch as the broker. The systems do have a certain load, but I > didn't think it would be able to cause anything like this. And shouldn't > threading etc. make sure something gets done within a reasonable time in > any case? What am I missing? > > Note that the Session object is reused - ideally (if there are no > errors) it's kept throughout the life-time of the application. The same > session is also used to receive data from a number of queues. Is that a > bad idea? Send and receive are done in the same thread. > > There is nothing in the broker log to suggest that that a problem is > detected on that end. > > I've experimented with "trace+" logging, but it seemed to produce so > much information that it bogged down the system. > > So I'm crying for help again... Anyone? > > Thanks. > > - Toralf > >