Re: Fwd: put vs. send

2013-03-09 Thread Bozo Dragojevic

On 3/8/13 3:19 PM, Rafael Schloming wrote:
On Thu, Mar 7, 2013 at 5:15 AM, Bozo Dragojevic > wrote:


On 3/6/13 3:45 AM, Rafael Schloming wrote:

Oops, meant to send this to the list.

-- Forwarded message --
From: Rafael Schlomingmailto:r...@alum.mit.edu>>
Date: Tue, Mar 5, 2013 at 6:44 PM
Subject: Re: put vs. send
To: Bozo Dragojevicmailto:bo...@digiverse.si>>


[snip]


Thank you for taking the time to explain, that's very helpful. It 
makes me wonder if we shouldn't include some kind of 
pn_messenger_interrupt() that to pull out of blocking calls early.


That'd be great!

Few weeks ago I've submitted 
https://issues.apache.org/jira/browse/PROTON-231

for this feature and the JIRA has a link to my attempt at implementation
(based off 0.3 branch) -- this is what we use now.

I've named it pn_messenger_wakeup() after pn_driver_wakeup() that it calls.
In addition the pn_messenger_tsync() needed to be adjusted to actually 
exit early :)






So for me this opens up two new areas (one of the reasons it took
me a bit to reply):
- study up the API on 'trackers',
  iiuc thats the name of what pn_messenger_put() returns
- figure out how to use them inside our code so that
  the publisher (API user) will not drown itself in messages
  it cannot realistically send.


I'm not sure trackers are what you want here (although it may be 
depending on your app). Trackers are used to track the status of the 
message at the receiver, e.g. it will tell you whether the receiver 
has accepted or rejected the message.


the fourth paragraph below circles back to trackers :)

If you just care about limiting the rate of your producer to match 
what you can actually push onto the wire, then you can just look at 
pn_messenger_outgoing(), that will tell you how many messages are 
actually backed up waiting to go onto the wire, e.g. before or after 
doing a put/send/recv/etc, you can check the size of the outgoing 
queue and based on that decide whether you need to throttle your producer.


Looking at total size of outgoing queue is sufficient for the messenger 
that is

part of the application that acts as the publisher and sends messages to the
broker as it then has just one peer.

The scenario that is important is this: Broker, too, wants to rate-limit 
each peer
based solely on that peer's capability to receive messages. So if there 
are two
subscribers and one is on a slower link, or is not behaving in some 
other way,

then the fast subscriber should not be held hostage of the slow subscriber.
This is even more so if we're talking about two publishers each having a 
set of own subscribers.


On the messenger level, the broker ends up having a bunch of amqp 
addresses where

the message received from the publisher need to be forwarded.
The answer that is sought is pn_messenger_outgoing(address). Now, a 
tracker does
uniquely identify the remote address that is embedded in the message 
being sent.
If there is/were a query of local messenger's opinion on progress status 
wrt.

delivering of that one message where one distinct answer would be
"managed to put it on the wire".

A function like that makes it trivial for a messenger user (broker) to 
perform

the accounting of 'number of outstanding messages per address' on it's own.





- ability to spread out load of serialization and deserialization
over cores
  and as mentioned above there are two 'layers' of it:
  bytes <=> pn_message <=> API object
  Providing this within the messenger paradigm sounds tricky to me
   -- I definitely don't want messenger to grow it's own thread
pool :)


This sounds to me like it could possibly be addressed by a smarter 
pn_message implementation. Right now pn_message_t is implemented in a 
very basic way where it actually does a lot of unnecessary 
encode/decode work. Not only could that be made more efficient so 
there is much less load to be distributed, but it could also be made 
to happen on demand. That would give you the flexibility of hitting 
most of the cost of the bytes <=> pn_message_t portion in whichever 
thread you chose.


This multi-core business obviously makes most sense for cases where 
there is more

than one TCP connection where data flows in.

The thread that read a bunch of bytes from a socket already has them in 
that CPU cache.
Doing as much of the work with those bytes on that CPU, without context 
switches,

will be fast.




- more control over IO and waiting (as per description above)
  this alone would still fit into your proposed extension quite nicely
  In the other thread you mention bringing send() and recv()
closer wrt meaning of
  a parameter.
  It'd be a tiny step from there to provide a sync(send_limit,
recv_limit)
  which would help me in reasoning of there being exactly one path
that ends up
  in pn_driver_wait() -

Re: Fwd: put vs. send

2013-03-08 Thread Rafael Schloming
On Thu, Mar 7, 2013 at 5:15 AM, Bozo Dragojevic  wrote:

> On 3/6/13 3:45 AM, Rafael Schloming wrote:
>
>> Oops, meant to send this to the list.
>>
>> -- Forwarded message --
>> From: Rafael Schloming
>> Date: Tue, Mar 5, 2013 at 6:44 PM
>> Subject: Re: put vs. send
>> To: Bozo Dragojevic
>>
>>
>> On Tue, Mar 5, 2013 at 3:25 PM, Bozo Dragojevic
>>  wrote:
>>
>>  Wow, by not ever calling pn_messenger_send(), but only
>>> pn_messenger_recv()
>>> things are, unexpectedly, better! I'll explain below what 'better' means.
>>>
>>> But first, this begs the question, what is the purpose of
>>> pn_messenger_send()
>>> and where (and why) it's appropriate/required to call it.
>>>
>>>  Well one example would be if you just want to send a bunch of messages
>> without ever receiving any.
>>
>> I think it helps if you conceptually split the API along
>> blocking/non-blocking lines:
>>
>>non-blocking: put, subscribe, get, incoming, outgoing, etc...
>>  blocking: send, recv
>>
>> Of the non blocking portion there are things that simply access data, like
>> get(), incoming, outgoing, etc, and then there are things that
>> schedule/request asynchronous work to be done, namely put() and
>> subscribe().
>>
>> For the blocking portion, I think it helps to understand that there is
>> really only one fundamental operation, and that is
>> do_any_outstanding_work_until_**condition_X_is_met. Currently the
>> blocking
>> portion of the API exposes exactly two conditions, namely "all messages
>> are
>> sent" via send() and "there is a message waiting to be received" via
>> recv().
>>
>> So in your example below if you're basically just a request/response
>> server
>> then as you've noticed you never actually need to call send since you
>> don't
>> really care about waiting for that condition, and any work you schedule in
>> your handlers will be performed anyways as you wait for messages to be
>> received.
>>
>
> Thanks for this really good explanation. This plus your proposed change in
> the other part of the thread would make the API much more
>
>
>  The results are for slightly modified 0.3 release.
>>
>>> Most notably, I have a local change that exposes pn_driver_wakeup()
>>> through the messenger api.
>>>
>>>  I'm curious about why you had to make this change. Can you describe the
>> usage/scenario that requires it?
>>
>
> Our design has a 'protocol' thread that is separate from the API,
> the idea being we can already be processing next incoming network
> messages while the API user is still busy servicing the previous one.
> By processing I mean all the CPU-required deserialization work of
> converting the network packets first to pn_message() and then to our
> internal higher-level representation of the message content.
>
> The mechanism how API thread sends a message is to hand off such a
> high-level objects off to the 'proto' thread via a queue.
>
> so we end up with a (proto) thread that has two wakeup sources
> a) network related events (ability to send more stuff or receive it), and
> b) commands from the API coming from the queue.
>
> Idle state for the proto thread is no network activity and no API
> activity and in that state we want to be blocked somewhere. We choose
> pn_driver_wait() as it's a natural wait point and also we can have best
> possible incoming message latency.
>
> We also need to react quickly to messages that API wants to send out.
> Given the messenger is hiding it's driver it seemed a smaller change
> to expose the pn_driver_wakeup() via messenger (and teach messenger
> to actually come out of it's tsync method).
>
> Even if we did add locking and would be able to call pn_messenger_put()
> from a separate thread, while we left the 'proto' thread blocked in
> pn_driver_wait() it wouldn't help as the poll mask would be wrong --
> if we were not sending before then pn_driver_rebuild() had not selected
> writable condition for that connection -- or we wouldn't be blocked in
> pn_driver_wait() -- talking about a mostly-idle low latency situation
> where TCP buffers are not clogged up.
>
> hope this clarifies things more than it muddles them up :)


Thank you for taking the time to explain, that's very helpful. It makes me
wonder if we shouldn't include some kind of pn_messenger_interrupt() that
to pull out of blocking calls early.


>
>
>
>  Our API is threaded internally but all proton communication is done
>>
>>> via a dedicated thread that runs the following (stub) event loop:
>>> (The same event loop is used by both, the client library, and the
>>> 'broker')
>>>
>>>while(1) {
>>>  ret = pn_messenger_recv(m,100)   // the 100 is hard to explain...
>>>  if (ret != PN_WAKED_UP) {// new error code for wakeup case
>>>  /*
>>>   * apparently there's no need to call send...
>>>   * pn_messenger_send(m);
>>>   */
>>>  }
>>>  Command cmd = cmd_queue.get();   // cmd_queue.put() from another
>>> thread
>>>   

Re: Fwd: put vs. send

2013-03-07 Thread Bozo Dragojevic

On 3/6/13 3:45 AM, Rafael Schloming wrote:

Oops, meant to send this to the list.

-- Forwarded message --
From: Rafael Schloming
Date: Tue, Mar 5, 2013 at 6:44 PM
Subject: Re: put vs. send
To: Bozo Dragojevic


On Tue, Mar 5, 2013 at 3:25 PM, Bozo Dragojevic  wrote:


Wow, by not ever calling pn_messenger_send(), but only pn_messenger_recv()
things are, unexpectedly, better! I'll explain below what 'better' means.

But first, this begs the question, what is the purpose of
pn_messenger_send()
and where (and why) it's appropriate/required to call it.


Well one example would be if you just want to send a bunch of messages
without ever receiving any.

I think it helps if you conceptually split the API along
blocking/non-blocking lines:

   non-blocking: put, subscribe, get, incoming, outgoing, etc...
 blocking: send, recv

Of the non blocking portion there are things that simply access data, like
get(), incoming, outgoing, etc, and then there are things that
schedule/request asynchronous work to be done, namely put() and subscribe().

For the blocking portion, I think it helps to understand that there is
really only one fundamental operation, and that is
do_any_outstanding_work_until_condition_X_is_met. Currently the blocking
portion of the API exposes exactly two conditions, namely "all messages are
sent" via send() and "there is a message waiting to be received" via recv().

So in your example below if you're basically just a request/response server
then as you've noticed you never actually need to call send since you don't
really care about waiting for that condition, and any work you schedule in
your handlers will be performed anyways as you wait for messages to be
received.


Thanks for this really good explanation. This plus your proposed change in
the other part of the thread would make the API much more


The results are for slightly modified 0.3 release.

Most notably, I have a local change that exposes pn_driver_wakeup()
through the messenger api.


I'm curious about why you had to make this change. Can you describe the
usage/scenario that requires it?


Our design has a 'protocol' thread that is separate from the API,
the idea being we can already be processing next incoming network
messages while the API user is still busy servicing the previous one.
By processing I mean all the CPU-required deserialization work of
converting the network packets first to pn_message() and then to our
internal higher-level representation of the message content.

The mechanism how API thread sends a message is to hand off such a
high-level objects off to the 'proto' thread via a queue.

so we end up with a (proto) thread that has two wakeup sources
a) network related events (ability to send more stuff or receive it), and
b) commands from the API coming from the queue.

Idle state for the proto thread is no network activity and no API
activity and in that state we want to be blocked somewhere. We choose
pn_driver_wait() as it's a natural wait point and also we can have best
possible incoming message latency.

We also need to react quickly to messages that API wants to send out.
Given the messenger is hiding it's driver it seemed a smaller change
to expose the pn_driver_wakeup() via messenger (and teach messenger
to actually come out of it's tsync method).

Even if we did add locking and would be able to call pn_messenger_put()
from a separate thread, while we left the 'proto' thread blocked in
pn_driver_wait() it wouldn't help as the poll mask would be wrong --
if we were not sending before then pn_driver_rebuild() had not selected
writable condition for that connection -- or we wouldn't be blocked in
pn_driver_wait() -- talking about a mostly-idle low latency situation
where TCP buffers are not clogged up.

hope this clarifies things more than it muddles them up :)



Our API is threaded internally but all proton communication is done

via a dedicated thread that runs the following (stub) event loop:
(The same event loop is used by both, the client library, and the 'broker')

   while(1) {
 ret = pn_messenger_recv(m,100)   // the 100 is hard to explain...
 if (ret != PN_WAKED_UP) {// new error code for wakeup case
 /*
  * apparently there's no need to call send...
  * pn_messenger_send(m);
  */
 }
 Command cmd = cmd_queue.get();   // cmd_queue.put() from another thread
  // will call pn_driver_wakeup() and
will
  // break out of pn_messenger_recv()
 if (cmd)
   handle(cmd);   // ends up calling pn_messenger_put()
 if (pn_messenger_incoming(m)) {
msg = pn_messenger_get(m);// handle just one message
  // pn_messenger_recv() will not block
  // until we're done
handle(msg);  // can end up calling
pn_messenger_put()
 }
   }


So, before the change,

Fwd: put vs. send

2013-03-05 Thread Rafael Schloming
Oops, meant to send this to the list.

-- Forwarded message --
From: Rafael Schloming 
Date: Tue, Mar 5, 2013 at 6:44 PM
Subject: Re: put vs. send
To: Bozo Dragojevic 


On Tue, Mar 5, 2013 at 3:25 PM, Bozo Dragojevic  wrote:

> Wow, by not ever calling pn_messenger_send(), but only pn_messenger_recv()
> things are, unexpectedly, better! I'll explain below what 'better' means.
>
> But first, this begs the question, what is the purpose of
> pn_messenger_send()
> and where (and why) it's appropriate/required to call it.
>

Well one example would be if you just want to send a bunch of messages
without ever receiving any.

I think it helps if you conceptually split the API along
blocking/non-blocking lines:

  non-blocking: put, subscribe, get, incoming, outgoing, etc...
blocking: send, recv

Of the non blocking portion there are things that simply access data, like
get(), incoming, outgoing, etc, and then there are things that
schedule/request asynchronous work to be done, namely put() and subscribe().

For the blocking portion, I think it helps to understand that there is
really only one fundamental operation, and that is
do_any_outstanding_work_until_condition_X_is_met. Currently the blocking
portion of the API exposes exactly two conditions, namely "all messages are
sent" via send() and "there is a message waiting to be received" via recv().

So in your example below if you're basically just a request/response server
then as you've noticed you never actually need to call send since you don't
really care about waiting for that condition, and any work you schedule in
your handlers will be performed anyways as you wait for messages to be
received.

The results are for slightly modified 0.3 release.
> Most notably, I have a local change that exposes pn_driver_wakeup()
> through the messenger api.
>

I'm curious about why you had to make this change. Can you describe the
usage/scenario that requires it?

Our API is threaded internally but all proton communication is done
> via a dedicated thread that runs the following (stub) event loop:
> (The same event loop is used by both, the client library, and the 'broker')
>
>   while(1) {
> ret = pn_messenger_recv(m,100)   // the 100 is hard to explain...
> if (ret != PN_WAKED_UP) {// new error code for wakeup case
> /*
>  * apparently there's no need to call send...
>  * pn_messenger_send(m);
>  */
> }
> Command cmd = cmd_queue.get();   // cmd_queue.put() from another thread
>  // will call pn_driver_wakeup() and
> will
>  // break out of pn_messenger_recv()
> if (cmd)
>   handle(cmd);   // ends up calling pn_messenger_put()
> if (pn_messenger_incoming(m)) {
>msg = pn_messenger_get(m);// handle just one message
>  // pn_messenger_recv() will not block
>  // until we're done
>handle(msg);  // can end up calling
> pn_messenger_put()
> }
>   }
>
>
> So, before the change, a test client that produced messages needed to
> throttle a bit, about 8ms between each 'command' that resulted in
> one 'pn_messenger_put()'
>
> If a lower delay (or no delay) was used, the client's messenger got
> confused
> after some fairly small number of messages sent (order of magnitude 10)
> and ended up sitting in pn_driver_wait while it had unsent messages to
> send.
>

This sounds like a bug to me. As I mentioned above, you shouldn't need to
call send, but it also shouldn't hurt.

With the one line change of commenting out the send() it can go full speed!
>
>
> I know it's hard to comment on out-of-tree modified pseudo code, but
> is such an event loop within the design goals of the messenger?
>

Yes, certainly. Your code looks to me like it is almost the same as the
server.py example.

Longer term we'll most likely be switching from messenger to engine + driver
> so we can go multithreaded with the event loop.
>

You may find there is a fair amount you would need to reimplement if you
were to make that switch. One of the features I'd like to get into
messenger soon is factoring the messenger implementation and adding to the
API so that you can supply your own driver and write your own event loop
based on the messenger API rather than having to go all the way down to the
engine API. If what you want is more control over the I/O and threading but
you're happy with the protocol interface provided by messenger, then this
might be a better option for you than using the engine directly.

--Rafael