On Fri, Aug 26, 2016 at 9:39 PM, Matt Broadstone <[email protected]> wrote:
> On Fri, Aug 26, 2016 at 7:58 PM, Matt Broadstone <[email protected]> > wrote: > >> On Fri, Aug 26, 2016 at 5:44 PM, Matt Broadstone <[email protected]> >> wrote: >> >>> On Fri, Aug 26, 2016 at 3:33 PM, Gordon Sim <[email protected]> wrote: >>> >>>> On 26/08/16 20:15, Matt Broadstone wrote: >>>> >>>>> Oops you're sorry I forgot a crucial part there, here's the updated >>>>> code: >>>>> const amqp = require('amqp10'); >>>>> >>>>> let client = new amqp.Client(); >>>>> client.connect('amqp://<address>') >>>>> .then(() => Promise.all([ >>>>> client.createSender('test.queue'), >>>>> client.createReceiver('test.queue', { >>>>> attach: { receiverSettleMode: 'settle' } >>>>> }) >>>>> ])) >>>>> .spread((sender, receiver) => { >>>>> let receiveCount = 0; >>>>> receiver.on('message', msg => { >>>>> receiveCount++; >>>>> if (receiveCount === 10) process.exit(0); >>>>> console.log('received[', receiveCount, '], releasing'); >>>>> receiver.release(msg); >>>>> }); >>>>> >>>>> return sender.send({ test: 'message' }); >>>>> }); >>>>> >>>>> With this code I now get the following on qpidd's side: >>>>> >>>> >>>> With that change I see the correct behaviour with qpidd also (see >>>> below). What version of qpidd are you using? >>>> >>>> From client: >>>> >>>> received[ 1 ], releasing >>>>> received[ 2 ], releasing >>>>> received[ 3 ], releasing >>>>> received[ 4 ], releasing >>>>> received[ 5 ], releasing >>>>> received[ 6 ], releasing >>>>> received[ 7 ], releasing >>>>> received[ 8 ], releasing >>>>> received[ 9 ], releasing >>>>> >>>> >>>> and on broker: >>>> >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"1", >>>>> message-format=0, settled=false, more=false, rcv-settle-mode=0, >>>>> resume=false, aborted=false, batchable=false] (21) >>>>> "\x00Sw\xc1\x10\x02\xa1\x04test\xa1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @flow(19) [next-incoming-id=2, incoming-window=2147483647, >>>>> next-outgoing-id=0, outgoing-window=2147483647, handle=0, >>>>> delivery-count=2, link-credit=500, drain=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=0, >>>>> delivery-tag=b"\x00\x00\x00\x00", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @disposition(21) [role=true, first=1, last=1, settled=true, >>>>> state=@accepted(36) []] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=0, last=0, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=1, >>>>> delivery-tag=b"\x00\x00\x00\x01", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=1, last=1, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=2, >>>>> delivery-tag=b"\x00\x00\x00\x02", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=2, last=2, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=3, >>>>> delivery-tag=b"\x00\x00\x00\x03", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=3, last=3, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=4, >>>>> delivery-tag=b"\x00\x00\x00\x04", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=4, last=4, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=5, >>>>> delivery-tag=b"\x00\x00\x00\x05", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=5, last=5, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=6, >>>>> delivery-tag=b"\x00\x00\x00\x06", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=6, last=6, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=7, >>>>> delivery-tag=b"\x00\x00\x00\x07", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=7, last=7, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=8, >>>>> delivery-tag=b"\x00\x00\x00\x08", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 1 <- @disposition(21) [role=true, first=8, last=8, settled=true, >>>>> state=@released(38) [], batchable=false] >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>> 0 -> @transfer(20) [handle=1, delivery-id=9, >>>>> delivery-tag=b"\x00\x00\x00\x09", >>>>> message-format=0, settled=false, more=false] (32) >>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>> 1\x07message" >>>>> >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: [email protected] >>>> For additional commands, e-mail: [email protected] >>>> >>>> >>> Hmm strange, still doesn't work on my side although when I switch to a >>> modified outcome it works but only twice before stalling. This is with 0.34 >>> so it's very likely something was fixed in interim? >>> >>> Matt >>> >>> >> Gordon, >> It seems that this behavior is specific to LVQs, or at least that's how >> I've isolated it in my case here. As you said, when I tried my sample code >> against a normal non-durable queue it indeed worked just fine. When I added >> the queue as an LVQ I begin to experience the behavior mentioned above: >> release doesn't actually re-deliver and modified re-delivers one time >> before stalling out. Any ideas why this might be? If you point me to the >> relevant code sections I can take a look at the source. >> >> Matt >> >> > So I did a little bit of investigation and it seems like the issue can be > resolved by adding: > > if (i == messages.end()) i = messages.begin(); > at: https://github.com/apache/qpid-cpp/blob/master/src/qpid/ > broker/MessageMap.cpp#L90 > > As far as I can tell if a message was requeued into the lvq map then the > message position is before the current cursor location, which in turn will > bail from the method because it's only looking forward (upper bound on the > map). > > I'm wondering if it's too late to address this issue before the pending > 1.35.0 release (obviously after code review), considering it's a single > line change and getting this fix into a subsequent release might take quite > a while. > > Matt > > > As a followup, I think my proposed fix here is a bit of a sledgehammer since it absolutely resets the cursor position in this particular case. The standard Queue implementation appears to use a concept of "versioning" to accommodate this use case, so I think there is room for improvement here. Obviously you guys would know far better an I would! Matt
