Back on topic, MQTT, I added more/better functional units tests and more 
comments, as well as variants against a local broker.

Here is a more complete example:

"Fork a process that reads 10 temperature sensor values"

[ (Array streamContents: [ :stream | | count |
    count := 1.
    MQTTExperimentalClient new
       clientId: 'listener';
       open;
       subscribeToTopic: '/device/42/outside-temperature';
       runWith: [ :message |
         stream nextPut: message contents asInteger.
         (count := count + 1) > 10 ifTrue: [ ConnectionClosed signal ] ] ]) 
inspect ] fork.

"Post 10 temperature sensor value readings, using a new client/connection each 
time"

1 to: 10 do: [ :i |
  MQTTExperimentalClient new
    atLeastOnce;
    open;
    sendMessage: i asByteArray toTopic: '/device/42/outside-temperature';
    close ]

Sven

> On 21 Dec 2016, at 21:57, Sven Van Caekenberghe <[email protected]> wrote:
> 
>> 
>> On 21 Dec 2016, at 18:19, Henrik Johansen <[email protected]> 
>> wrote:
>> 
>>> 
>>> On 21 Dec 2016, at 16:52 , Sven Van Caekenberghe <[email protected]> wrote:
>>> 
>>>> 
>>>> On 21 Dec 2016, at 16:04, Henrik Johansen <[email protected]> 
>>>> wrote:
>>>> 
>>>>> 
>>>>> On 21 Dec 2016, at 14:57 , Sven Van Caekenberghe <[email protected]> wrote:
>>>>> 
>>>>> Hendrik,
>>>>> 
>>>>> Thank you for this detailed feedback.
>>>>> 
>>>>>> On 21 Dec 2016, at 12:44, Henrik Johansen <[email protected]> 
>>>>>> wrote:
>>>>>> 
>>>>>> Hi Sven!
>>>>>> One thing I noticed when testing the RabbitMQ client with keepalive > 0, 
>>>>>> was connection being closed and all subscriptions lost when receiving 
>>>>>> large payloads, due to timeout before the keepalive packet could be sent 
>>>>>> before waiting to receive next object.
>>>>> 
>>>>> Indeed, I used my Stamp (STOMP) Rabbit MQ client as a model for the MQTT 
>>>>> one - there is a lot of similarity, especially in concept.
>>>>> 
>>>>> Keep alive processing is not that easy. I tried to do it by using read 
>>>>> timeouts as a source of regular opportunities to check for the need to 
>>>>> process keep alive logic. But of course, if you have no outstanding read 
>>>>> (in a loop), that won't work.
>>>>> 
>>>>> The fact that receiving a large payload would trigger an actual keep 
>>>>> alive time out is not something that I have seen myself. It seems weird 
>>>>> that the reading/transferring of incoming data would not count as 
>>>>> activity against keep alive, no ?
>>>> 
>>>> I never had a chance to investigate fully, but I distinctly remember 
>>>> having the same reaction!
>>>> It's quite awhile ago now, so my memory might be hazy, take the following 
>>>> with an appropriate amount of grains of salt.
>>>> 
>>>> The first times I encountered it, it seemed quite random, occuring after 
>>>> extended periods of client inactivity after receiving only small 
>>>> payloads...
>>>> Setting a much shorter keepalive timeout than the default was/is very 
>>>> useful in reproducing/verifying if it is an issue.
>>>> The timeouts then occurred relatively shortly after I'd received a single 
>>>> payload with no other activity, and disappeared once I removed the 
>>>> resetting of lastActivity timestamp on reads, indicating that for at least 
>>>> rabbitmq (3.5 was the version at the time, I believe), receiving data was 
>>>> *not* being counted as keep-alive activity.
>>>> 
>>>> The issue of receiving large payloads blocking writing in time was still 
>>>> unresolved, consistently cut off in the middle of (its own!) multi-MB 
>>>> payload transfers due to keepalive packet not being sent.
>>>> I couldn't see a solution other than abandoning the elegant 
>>>> single-threaded approach and do keepalive as a separate high-priority 
>>>> process, but the architectural choice for the app  changed at this point 
>>>> to not include a MQ in first delivery, so the more involved rewrite for 
>>>> doing so before deploying in production, kinda got stranded :/
>>> 
>>> Instinctively it feels like messaging and multi-MB payloads would not be a 
>>> good fit, at least I would suspect that they are an edge case. But maybe I 
>>> am wrong.
>>> 
>>> I think that the code in StampMedium>>#readBodyBytes: and 
>>> StampMedium>>#readBodyString: could be refactored to use a loop with chunk 
>>> buffers and at the same time check the elapsed time to fire back keep alive 
>>> pings if necessary. A ping too much would not hurt, better safe than sorry. 
>>> But is is hard to catch any/all network slowdowns, any IO operation could 
>>> hang/timeout.
>>> 
>>> But all this would assume that the problematic situation can be recreated.
>> 
>> Absolutely, far from certain the MQTT server implementations work the same.
>> 
>> This is the test code I used for rabbit: 
>> 
>> consumer1 := StampClient new
>> timeout:1;
>> heartbeat: 5000;
>> login: 'guest';
>>      passcode: 'guest';
>>      open;
>> subscribeTo: '/exchange/testall';
>> yourself.
>> [[ consumer1 runWith:[:message | Transcript crShow: '1 read: ', message 
>> body]] ensure: [consumer1 close]] forkAt: Processor userInterruptPriority .
>> 
>> producer := StampClient new
>> login: 'guest';
>> passcode: 'guest';
>> open; yourself.
>> 
>> [1 to: 10000 do: [:i | producer sendText: i asString to: 
>> '/exchange/testall']] forkAt: Processor systemBackgroundPriority.
>> 
>> Sometimes, but not always, this would fail, either during processing, or 
>> some time after.
>> 
>> A screenshot of how it looks when a disconnect happens during processing due 
>> to heartbeat failing can be found at:
>> https://tresor.it/s#HN4RG9PEX1SYyKy4NcM31w
> 
> I installed RabbitMQ 3.6.4 on my Mac and tried your example and it succeeded 
> ;-) But I modified it slightly so that both threads have a stop condition (I 
> also used /queue instead of /exchange but I don't know if that makes a 
> difference).
> 
> limit := 10000.
> 
> consumer1 := StampClient new
> timeout:1;
> heartbeat: 5000;
> login: 'guest';
> passcode: 'guest';
> open;
> subscribeTo: '/queue/testall';
> yourself.
> 
> producer := StampClient new
> login: 'guest';
> passcode: 'guest';
> open; 
> yourself.
> 
> [ [ consumer1 runWith: [ :message | Transcript crShow: '1 read: ', message 
> body. message body = limit asString ifTrue: [ ConnectionClosed signal ] ] ] 
> ensure: [ consumer1 close ] ] forkAt: Processor userInterruptPriority.
> 
> [ 1 to: limit do: [ :i | producer sendText: i asString to: '/queue/testall' 
> ]. producer close ] forkAt: Processor systemBackgroundPriority.
> 
> 
> You did see the #testSimpleWorkQueue unit test ?
> And the StampBenchmarkTests ?
> You could also try running StampBenchmark manually with higher loads.
> 
> 
> What I mean is that I normally use the #writeWithReceipt: and 
> #clientIndividualAck subscriptions with explicit ack/nack responses. The 
> reason is that the way you do it (although allowed and the fastest way), 
> there is no traffic flow regulation: the producer basically sends everything 
> in one big go (because it does not wait for receipts from the server), the 
> consumer gets all messages in one big batch from the broker - socket stream 
> buffers at different levels will just fill up.
> 
> All this does not yet explain your problem, I know.
> 
> 
> I also did a checkin because I still had this modification in my image:
> 
> ===
> Name: Stamp-SvenVanCaekenberghe.24
> Author: SvenVanCaekenberghe
> Time: 21 December 2016, 9:54:27.217571 pm
> UUID: ee3a7276-6e00-4143-9c3e-03a320c14ea6
> Ancestors: Stamp-HolgerHansPeterFreyther.23
> 
> StampClient
> - minor refactoring in #writeHeartbeatIfNeeded (#wasInactive ForTooLong)
> - add #touch call to #writeNoFlush:
> ===
> 
> Just to make sure we're on the same version (I am still using Pharo 4 for 
> this, BTW).
> 
>> Cheers,
>> Henry


Reply via email to