Back on topic, MQTT, I added more/better functional units tests and more
comments, as well as variants against a local broker.
Here is a more complete example:
"Fork a process that reads 10 temperature sensor values"
[ (Array streamContents: [ :stream | | count |
count := 1.
MQTTExperimentalClient new
clientId: 'listener';
open;
subscribeToTopic: '/device/42/outside-temperature';
runWith: [ :message |
stream nextPut: message contents asInteger.
(count := count + 1) > 10 ifTrue: [ ConnectionClosed signal ] ] ])
inspect ] fork.
"Post 10 temperature sensor value readings, using a new client/connection each
time"
1 to: 10 do: [ :i |
MQTTExperimentalClient new
atLeastOnce;
open;
sendMessage: i asByteArray toTopic: '/device/42/outside-temperature';
close ]
Sven
> On 21 Dec 2016, at 21:57, Sven Van Caekenberghe <[email protected]> wrote:
>
>>
>> On 21 Dec 2016, at 18:19, Henrik Johansen <[email protected]>
>> wrote:
>>
>>>
>>> On 21 Dec 2016, at 16:52 , Sven Van Caekenberghe <[email protected]> wrote:
>>>
>>>>
>>>> On 21 Dec 2016, at 16:04, Henrik Johansen <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> On 21 Dec 2016, at 14:57 , Sven Van Caekenberghe <[email protected]> wrote:
>>>>>
>>>>> Hendrik,
>>>>>
>>>>> Thank you for this detailed feedback.
>>>>>
>>>>>> On 21 Dec 2016, at 12:44, Henrik Johansen <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Sven!
>>>>>> One thing I noticed when testing the RabbitMQ client with keepalive > 0,
>>>>>> was connection being closed and all subscriptions lost when receiving
>>>>>> large payloads, due to timeout before the keepalive packet could be sent
>>>>>> before waiting to receive next object.
>>>>>
>>>>> Indeed, I used my Stamp (STOMP) Rabbit MQ client as a model for the MQTT
>>>>> one - there is a lot of similarity, especially in concept.
>>>>>
>>>>> Keep alive processing is not that easy. I tried to do it by using read
>>>>> timeouts as a source of regular opportunities to check for the need to
>>>>> process keep alive logic. But of course, if you have no outstanding read
>>>>> (in a loop), that won't work.
>>>>>
>>>>> The fact that receiving a large payload would trigger an actual keep
>>>>> alive time out is not something that I have seen myself. It seems weird
>>>>> that the reading/transferring of incoming data would not count as
>>>>> activity against keep alive, no ?
>>>>
>>>> I never had a chance to investigate fully, but I distinctly remember
>>>> having the same reaction!
>>>> It's quite awhile ago now, so my memory might be hazy, take the following
>>>> with an appropriate amount of grains of salt.
>>>>
>>>> The first times I encountered it, it seemed quite random, occuring after
>>>> extended periods of client inactivity after receiving only small
>>>> payloads...
>>>> Setting a much shorter keepalive timeout than the default was/is very
>>>> useful in reproducing/verifying if it is an issue.
>>>> The timeouts then occurred relatively shortly after I'd received a single
>>>> payload with no other activity, and disappeared once I removed the
>>>> resetting of lastActivity timestamp on reads, indicating that for at least
>>>> rabbitmq (3.5 was the version at the time, I believe), receiving data was
>>>> *not* being counted as keep-alive activity.
>>>>
>>>> The issue of receiving large payloads blocking writing in time was still
>>>> unresolved, consistently cut off in the middle of (its own!) multi-MB
>>>> payload transfers due to keepalive packet not being sent.
>>>> I couldn't see a solution other than abandoning the elegant
>>>> single-threaded approach and do keepalive as a separate high-priority
>>>> process, but the architectural choice for the app changed at this point
>>>> to not include a MQ in first delivery, so the more involved rewrite for
>>>> doing so before deploying in production, kinda got stranded :/
>>>
>>> Instinctively it feels like messaging and multi-MB payloads would not be a
>>> good fit, at least I would suspect that they are an edge case. But maybe I
>>> am wrong.
>>>
>>> I think that the code in StampMedium>>#readBodyBytes: and
>>> StampMedium>>#readBodyString: could be refactored to use a loop with chunk
>>> buffers and at the same time check the elapsed time to fire back keep alive
>>> pings if necessary. A ping too much would not hurt, better safe than sorry.
>>> But is is hard to catch any/all network slowdowns, any IO operation could
>>> hang/timeout.
>>>
>>> But all this would assume that the problematic situation can be recreated.
>>
>> Absolutely, far from certain the MQTT server implementations work the same.
>>
>> This is the test code I used for rabbit:
>>
>> consumer1 := StampClient new
>> timeout:1;
>> heartbeat: 5000;
>> login: 'guest';
>> passcode: 'guest';
>> open;
>> subscribeTo: '/exchange/testall';
>> yourself.
>> [[ consumer1 runWith:[:message | Transcript crShow: '1 read: ', message
>> body]] ensure: [consumer1 close]] forkAt: Processor userInterruptPriority .
>>
>> producer := StampClient new
>> login: 'guest';
>> passcode: 'guest';
>> open; yourself.
>>
>> [1 to: 10000 do: [:i | producer sendText: i asString to:
>> '/exchange/testall']] forkAt: Processor systemBackgroundPriority.
>>
>> Sometimes, but not always, this would fail, either during processing, or
>> some time after.
>>
>> A screenshot of how it looks when a disconnect happens during processing due
>> to heartbeat failing can be found at:
>> https://tresor.it/s#HN4RG9PEX1SYyKy4NcM31w
>
> I installed RabbitMQ 3.6.4 on my Mac and tried your example and it succeeded
> ;-) But I modified it slightly so that both threads have a stop condition (I
> also used /queue instead of /exchange but I don't know if that makes a
> difference).
>
> limit := 10000.
>
> consumer1 := StampClient new
> timeout:1;
> heartbeat: 5000;
> login: 'guest';
> passcode: 'guest';
> open;
> subscribeTo: '/queue/testall';
> yourself.
>
> producer := StampClient new
> login: 'guest';
> passcode: 'guest';
> open;
> yourself.
>
> [ [ consumer1 runWith: [ :message | Transcript crShow: '1 read: ', message
> body. message body = limit asString ifTrue: [ ConnectionClosed signal ] ] ]
> ensure: [ consumer1 close ] ] forkAt: Processor userInterruptPriority.
>
> [ 1 to: limit do: [ :i | producer sendText: i asString to: '/queue/testall'
> ]. producer close ] forkAt: Processor systemBackgroundPriority.
>
>
> You did see the #testSimpleWorkQueue unit test ?
> And the StampBenchmarkTests ?
> You could also try running StampBenchmark manually with higher loads.
>
>
> What I mean is that I normally use the #writeWithReceipt: and
> #clientIndividualAck subscriptions with explicit ack/nack responses. The
> reason is that the way you do it (although allowed and the fastest way),
> there is no traffic flow regulation: the producer basically sends everything
> in one big go (because it does not wait for receipts from the server), the
> consumer gets all messages in one big batch from the broker - socket stream
> buffers at different levels will just fill up.
>
> All this does not yet explain your problem, I know.
>
>
> I also did a checkin because I still had this modification in my image:
>
> ===
> Name: Stamp-SvenVanCaekenberghe.24
> Author: SvenVanCaekenberghe
> Time: 21 December 2016, 9:54:27.217571 pm
> UUID: ee3a7276-6e00-4143-9c3e-03a320c14ea6
> Ancestors: Stamp-HolgerHansPeterFreyther.23
>
> StampClient
> - minor refactoring in #writeHeartbeatIfNeeded (#wasInactive ForTooLong)
> - add #touch call to #writeNoFlush:
> ===
>
> Just to make sure we're on the same version (I am still using Pharo 4 for
> this, BTW).
>
>> Cheers,
>> Henry