On 09/12/2020 17:31, Chester wrote:
Is it possibly a nagle problem? We always set "tcp_nodelay: true" in our
configs...
That's also done in this application. Or at least it's supposed to be.
Perhaps worth checking if the option actually gets enabled the right way...
We also re-use sessions throughout the application lifetime
OK. Good.
- Toralf
.
On Wed, Dec 9, 2020 at 8:36 AM Toralf Lund <toralf.l...@pgs.com> wrote:
I'm still debugging the communication issues I've been struggling with
for months - where there are unexpected delays in (I think) message
passing between a C++ Messaging API application and a C++ broker. I've
now narrowed down the problem to the send() call, I believe. I'm using
the following function(s) to publish data - notice the timing etc.:
namespace Util {
namespace {
inline double clockTime(clockid_t clockId)
{
{
struct timespec t;
if(clock_gettime(clockId, &t)==0) {
return t.tv_sec+0.000000001*t.tv_nsec;
}
return -INFINITY;
}
}
}
double currentMonotonicTime()
{
return clockTime(CLOCK_MONOTONIC);
}
double currentProcessTime()
{
return clockTime(CLOCK_PROCESS_CPUTIME_ID);
}
} // End namespace Util
#define PUBLISH_SENDER_CAPACITY 100
std::string QPid::publishArgs()
{
std::string exchangeArgs="create: sender, delete: never";
exchangeArgs+=", node: { type: topic, durable: False }";
return exchangeArgs;
}
bool QPid::publish(qpid::messaging::Session &session,
const qpid::messaging::Message &message)
{
double t0=Util::currentMonotonicTime(), t1=t0;
double t0cpu=Util::currentThreadTime(), t1cpu=t0cpu;
std::string exchangeName=publishExchange();
try {
session.checkError();
qpid::messaging::Sender sender;
try {
sender=session.getSender(exchangeName);
} catch(qpid::messaging::KeyError &error) {
std::string exchangeArgs=publishArgs();
std::string address=exchangeName + "; { " + exchangeArgs + " }";
sender=session.createSender(address);
#ifdef PUBLISH_SENDER_CAPACITY
sender.setCapacity(PUBLISH_SENDER_CAPACITY);
#endif
}
#ifdef DEBUG
std::cerr << "Subject " << message.getSubject() << "\n";
#endif /* DEBUG */
t1=Util::currentMonotonicTime();
t1cpu=Util::currentThreadTime();
sender.send(message);
} catch(const std::exception &error) {
const char *errorMessage=error.what();
if(errorMessage && *errorMessage) {
Util::error("\"%s\" when sending message to %s/%s", errorMessage,
exchangeName.c_str(), message.getSubject().c_str());
} else {
/* Note: Exceptions from Session::checkError() often have no error
message */
Util::error("Unspecified failure or pre-existing link problem when "
"sending to %s/%s", exchangeName.c_str(),
message.getSubject().c_str());
}
return false;
}
double t2=Util::currentMonotonicTime(), t2cpu=Util::currentThreadTime();
Util::debug(2, "%s sent to %s in %.2f ms/CPU time %.2f ms (%.2f/%.2f ms
in send())",
message.getSubject().c_str(), exchangeName.c_str(),
(t2-t0)*1000,
(t2cpu-t0cpu)*1000, (t2-t1)*1000, (t2cpu-t1cpu)*1000);
return true;
}
Util::debug() is a simple function that optionally (subject to a "debug
level") config sends data to the console; it takes printf()-style input.
Now, most of the time, output from the above with QPid "debug+" logging
is something like (some names are changed due to paranoia):
2020-12-09 12:30:00 [Client] debug Sending to exchange Data.exch
{MessageProperties: content-length=3; content-type=application/x-protobuf;
user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64;
application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516990),timestamp_usec:F8:int64(16253),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000;
routing-key=cxxx.out.extdata.7; }
Dec 9 12:30:00 cxxx.out.extdata.7 sent to Data.exch in 0.03 ms/CPU time
0.03 ms (0.03/0.03 ms in send())
but I sometimes get
2020-12-09 12:30:07 [Client] debug Sending to exchange Data.exch
{MessageProperties: content-length=3; content-type=application/x-protobuf;
user-id=; app-id=TheProgram 1.0-0.20201207.1.6e9dd21.el7-x86_64;
application-headers={message_type:V2:27:vbin16(Cxxx.Out.ExtData),qpid.subject:V2:29:str16(cxxx.out.extdata.7),timestamp:F8:int64(1607516991),timestamp_usec:F8:int64(8403),x-amqp-0-10.app-id:V2:47:vbin16(TheProgram
1.0-0.20201207.1.6e9dd21.el7-x86_64)}; } {DeliveryProperties: ttl=10000;
routing-key=cxxx.out.extdata.7; }
Dec 9 12:30:07 cxxx.out.extdata.7 sent to Data.exch in 7195.39 ms/CPU
time 2.45 ms (7195.39/2.45 ms in send())
My question is simply, what might cause the send to take several
seconds? I can't see any reason why the communication itself should be
that slow; the software runs on a machine that's connected to the same
network switch as the broker. The systems do have a certain load, but I
didn't think it would be able to cause anything like this. And shouldn't
threading etc. make sure something gets done within a reasonable time in
any case? What am I missing?
Note that the Session object is reused - ideally (if there are no
errors) it's kept throughout the life-time of the application. The same
session is also used to receive data from a number of queues. Is that a
bad idea? Send and receive are done in the same thread.
There is nothing in the broker log to suggest that that a problem is
detected on that end.
I've experimented with "trace+" logging, but it seemed to produce so
much information that it bogged down the system.
So I'm crying for help again... Anyone?
Thanks.
- Toralf
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org