> On 21 Dec 2016, at 18:19, Henrik Johansen <[email protected]> > wrote: > >> >> On 21 Dec 2016, at 16:52 , Sven Van Caekenberghe <[email protected]> wrote: >> >>> >>> On 21 Dec 2016, at 16:04, Henrik Johansen <[email protected]> >>> wrote: >>> >>>> >>>> On 21 Dec 2016, at 14:57 , Sven Van Caekenberghe <[email protected]> wrote: >>>> >>>> Hendrik, >>>> >>>> Thank you for this detailed feedback. >>>> >>>>> On 21 Dec 2016, at 12:44, Henrik Johansen <[email protected]> >>>>> wrote: >>>>> >>>>> Hi Sven! >>>>> One thing I noticed when testing the RabbitMQ client with keepalive > 0, >>>>> was connection being closed and all subscriptions lost when receiving >>>>> large payloads, due to timeout before the keepalive packet could be sent >>>>> before waiting to receive next object. >>>> >>>> Indeed, I used my Stamp (STOMP) Rabbit MQ client as a model for the MQTT >>>> one - there is a lot of similarity, especially in concept. >>>> >>>> Keep alive processing is not that easy. I tried to do it by using read >>>> timeouts as a source of regular opportunities to check for the need to >>>> process keep alive logic. But of course, if you have no outstanding read >>>> (in a loop), that won't work. >>>> >>>> The fact that receiving a large payload would trigger an actual keep alive >>>> time out is not something that I have seen myself. It seems weird that the >>>> reading/transferring of incoming data would not count as activity against >>>> keep alive, no ? >>> >>> I never had a chance to investigate fully, but I distinctly remember having >>> the same reaction! >>> It's quite awhile ago now, so my memory might be hazy, take the following >>> with an appropriate amount of grains of salt. >>> >>> The first times I encountered it, it seemed quite random, occuring after >>> extended periods of client inactivity after receiving only small payloads... >>> Setting a much shorter keepalive timeout than the default was/is very >>> useful in reproducing/verifying if it is an issue. >>> The timeouts then occurred relatively shortly after I'd received a single >>> payload with no other activity, and disappeared once I removed the >>> resetting of lastActivity timestamp on reads, indicating that for at least >>> rabbitmq (3.5 was the version at the time, I believe), receiving data was >>> *not* being counted as keep-alive activity. >>> >>> The issue of receiving large payloads blocking writing in time was still >>> unresolved, consistently cut off in the middle of (its own!) multi-MB >>> payload transfers due to keepalive packet not being sent. >>> I couldn't see a solution other than abandoning the elegant single-threaded >>> approach and do keepalive as a separate high-priority process, but the >>> architectural choice for the app changed at this point to not include a MQ >>> in first delivery, so the more involved rewrite for doing so before >>> deploying in production, kinda got stranded :/ >> >> Instinctively it feels like messaging and multi-MB payloads would not be a >> good fit, at least I would suspect that they are an edge case. But maybe I >> am wrong. >> >> I think that the code in StampMedium>>#readBodyBytes: and >> StampMedium>>#readBodyString: could be refactored to use a loop with chunk >> buffers and at the same time check the elapsed time to fire back keep alive >> pings if necessary. A ping too much would not hurt, better safe than sorry. >> But is is hard to catch any/all network slowdowns, any IO operation could >> hang/timeout. >> >> But all this would assume that the problematic situation can be recreated. > > Absolutely, far from certain the MQTT server implementations work the same. > > This is the test code I used for rabbit: > > consumer1 := StampClient new > timeout:1; > heartbeat: 5000; > login: 'guest'; > passcode: 'guest'; > open; > subscribeTo: '/exchange/testall'; > yourself. > [[ consumer1 runWith:[:message | Transcript crShow: '1 read: ', message > body]] ensure: [consumer1 close]] forkAt: Processor userInterruptPriority . > > producer := StampClient new > login: 'guest'; > passcode: 'guest'; > open; yourself. > > [1 to: 10000 do: [:i | producer sendText: i asString to: > '/exchange/testall']] forkAt: Processor systemBackgroundPriority. > > Sometimes, but not always, this would fail, either during processing, or some > time after. > > A screenshot of how it looks when a disconnect happens during processing due > to heartbeat failing can be found at: > https://tresor.it/s#HN4RG9PEX1SYyKy4NcM31w
I installed RabbitMQ 3.6.4 on my Mac and tried your example and it succeeded ;-) But I modified it slightly so that both threads have a stop condition (I also used /queue instead of /exchange but I don't know if that makes a difference). limit := 10000. consumer1 := StampClient new timeout:1; heartbeat: 5000; login: 'guest'; passcode: 'guest'; open; subscribeTo: '/queue/testall'; yourself. producer := StampClient new login: 'guest'; passcode: 'guest'; open; yourself. [ [ consumer1 runWith: [ :message | Transcript crShow: '1 read: ', message body. message body = limit asString ifTrue: [ ConnectionClosed signal ] ] ] ensure: [ consumer1 close ] ] forkAt: Processor userInterruptPriority. [ 1 to: limit do: [ :i | producer sendText: i asString to: '/queue/testall' ]. producer close ] forkAt: Processor systemBackgroundPriority. You did see the #testSimpleWorkQueue unit test ? And the StampBenchmarkTests ? You could also try running StampBenchmark manually with higher loads. What I mean is that I normally use the #writeWithReceipt: and #clientIndividualAck subscriptions with explicit ack/nack responses. The reason is that the way you do it (although allowed and the fastest way), there is no traffic flow regulation: the producer basically sends everything in one big go (because it does not wait for receipts from the server), the consumer gets all messages in one big batch from the broker - socket stream buffers at different levels will just fill up. All this does not yet explain your problem, I know. I also did a checkin because I still had this modification in my image: === Name: Stamp-SvenVanCaekenberghe.24 Author: SvenVanCaekenberghe Time: 21 December 2016, 9:54:27.217571 pm UUID: ee3a7276-6e00-4143-9c3e-03a320c14ea6 Ancestors: Stamp-HolgerHansPeterFreyther.23 StampClient - minor refactoring in #writeHeartbeatIfNeeded (#wasInactive ForTooLong) - add #touch call to #writeNoFlush: === Just to make sure we're on the same version (I am still using Pharo 4 for this, BTW). > Cheers, > Henry
