On Fri, Aug 26, 2016 at 9:45 PM, Matt Broadstone <[email protected]> wrote:
> On Fri, Aug 26, 2016 at 9:39 PM, Matt Broadstone <[email protected]> > wrote: > >> On Fri, Aug 26, 2016 at 7:58 PM, Matt Broadstone <[email protected]> >> wrote: >> >>> On Fri, Aug 26, 2016 at 5:44 PM, Matt Broadstone <[email protected]> >>> wrote: >>> >>>> On Fri, Aug 26, 2016 at 3:33 PM, Gordon Sim <[email protected]> wrote: >>>> >>>>> On 26/08/16 20:15, Matt Broadstone wrote: >>>>> >>>>>> Oops you're sorry I forgot a crucial part there, here's the updated >>>>>> code: >>>>>> const amqp = require('amqp10'); >>>>>> >>>>>> let client = new amqp.Client(); >>>>>> client.connect('amqp://<address>') >>>>>> .then(() => Promise.all([ >>>>>> client.createSender('test.queue'), >>>>>> client.createReceiver('test.queue', { >>>>>> attach: { receiverSettleMode: 'settle' } >>>>>> }) >>>>>> ])) >>>>>> .spread((sender, receiver) => { >>>>>> let receiveCount = 0; >>>>>> receiver.on('message', msg => { >>>>>> receiveCount++; >>>>>> if (receiveCount === 10) process.exit(0); >>>>>> console.log('received[', receiveCount, '], releasing'); >>>>>> receiver.release(msg); >>>>>> }); >>>>>> >>>>>> return sender.send({ test: 'message' }); >>>>>> }); >>>>>> >>>>>> With this code I now get the following on qpidd's side: >>>>>> >>>>> >>>>> With that change I see the correct behaviour with qpidd also (see >>>>> below). What version of qpidd are you using? >>>>> >>>>> From client: >>>>> >>>>> received[ 1 ], releasing >>>>>> received[ 2 ], releasing >>>>>> received[ 3 ], releasing >>>>>> received[ 4 ], releasing >>>>>> received[ 5 ], releasing >>>>>> received[ 6 ], releasing >>>>>> received[ 7 ], releasing >>>>>> received[ 8 ], releasing >>>>>> received[ 9 ], releasing >>>>>> >>>>> >>>>> and on broker: >>>>> >>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"1", >>>>>> message-format=0, settled=false, more=false, rcv-settle-mode=0, >>>>>> resume=false, aborted=false, batchable=false] (21) >>>>>> "\x00Sw\xc1\x10\x02\xa1\x04test\xa1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @flow(19) [next-incoming-id=2, incoming-window=2147483647, >>>>>> next-outgoing-id=0, outgoing-window=2147483647, handle=0, >>>>>> delivery-count=2, link-credit=500, drain=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=0, >>>>>> delivery-tag=b"\x00\x00\x00\x00", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @disposition(21) [role=true, first=1, last=1, settled=true, >>>>>> state=@accepted(36) []] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=0, last=0, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=1, >>>>>> delivery-tag=b"\x00\x00\x00\x01", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=1, last=1, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=2, >>>>>> delivery-tag=b"\x00\x00\x00\x02", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=2, last=2, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=3, >>>>>> delivery-tag=b"\x00\x00\x00\x03", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=3, last=3, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=4, >>>>>> delivery-tag=b"\x00\x00\x00\x04", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=4, last=4, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=5, >>>>>> delivery-tag=b"\x00\x00\x00\x05", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=5, last=5, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=6, >>>>>> delivery-tag=b"\x00\x00\x00\x06", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=6, last=6, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=7, >>>>>> delivery-tag=b"\x00\x00\x00\x07", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=7, last=7, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=8, >>>>>> delivery-tag=b"\x00\x00\x00\x08", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 1 <- @disposition(21) [role=true, first=8, last=8, settled=true, >>>>>> state=@released(38) [], batchable=false] >>>>>> 2016-08-26 20:30:55 [Protocol] trace >>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>>>> 0 -> @transfer(20) [handle=1, delivery-id=9, >>>>>> delivery-tag=b"\x00\x00\x00\x09", >>>>>> message-format=0, settled=false, more=false] (32) >>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>>>> 1\x07message" >>>>>> >>>>> >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: [email protected] >>>>> For additional commands, e-mail: [email protected] >>>>> >>>>> >>>> Hmm strange, still doesn't work on my side although when I switch to a >>>> modified outcome it works but only twice before stalling. This is with 0.34 >>>> so it's very likely something was fixed in interim? >>>> >>>> Matt >>>> >>>> >>> Gordon, >>> It seems that this behavior is specific to LVQs, or at least that's how >>> I've isolated it in my case here. As you said, when I tried my sample code >>> against a normal non-durable queue it indeed worked just fine. When I added >>> the queue as an LVQ I begin to experience the behavior mentioned above: >>> release doesn't actually re-deliver and modified re-delivers one time >>> before stalling out. Any ideas why this might be? If you point me to the >>> relevant code sections I can take a look at the source. >>> >>> Matt >>> >>> >> So I did a little bit of investigation and it seems like the issue can be >> resolved by adding: >> >> if (i == messages.end()) i = messages.begin(); >> at: https://github.com/apache/qpid-cpp/blob/master/src/qpid/brok >> er/MessageMap.cpp#L90 >> >> As far as I can tell if a message was requeued into the lvq map then the >> message position is before the current cursor location, which in turn will >> bail from the method because it's only looking forward (upper bound on the >> map). >> >> I'm wondering if it's too late to address this issue before the pending >> 1.35.0 release (obviously after code review), considering it's a single >> line change and getting this fix into a subsequent release might take quite >> a while. >> >> Matt >> >> >> > As a followup, I think my proposed fix here is a bit of a sledgehammer > since it absolutely resets the cursor position in this particular case. The > standard Queue implementation appears to use a concept of "versioning" to > accommodate this use case, so I think there is room for improvement here. > Obviously you guys would know far better an I would! > > Matt > > > Reported: https://issues.apache.org/jira/browse/QPID-7407 Matt
