On Fri, Aug 26, 2016 at 7:58 PM, Matt Broadstone <[email protected]> wrote:
> On Fri, Aug 26, 2016 at 5:44 PM, Matt Broadstone <[email protected]> > wrote: > >> On Fri, Aug 26, 2016 at 3:33 PM, Gordon Sim <[email protected]> wrote: >> >>> On 26/08/16 20:15, Matt Broadstone wrote: >>> >>>> Oops you're sorry I forgot a crucial part there, here's the updated >>>> code: >>>> const amqp = require('amqp10'); >>>> >>>> let client = new amqp.Client(); >>>> client.connect('amqp://<address>') >>>> .then(() => Promise.all([ >>>> client.createSender('test.queue'), >>>> client.createReceiver('test.queue', { >>>> attach: { receiverSettleMode: 'settle' } >>>> }) >>>> ])) >>>> .spread((sender, receiver) => { >>>> let receiveCount = 0; >>>> receiver.on('message', msg => { >>>> receiveCount++; >>>> if (receiveCount === 10) process.exit(0); >>>> console.log('received[', receiveCount, '], releasing'); >>>> receiver.release(msg); >>>> }); >>>> >>>> return sender.send({ test: 'message' }); >>>> }); >>>> >>>> With this code I now get the following on qpidd's side: >>>> >>> >>> With that change I see the correct behaviour with qpidd also (see >>> below). What version of qpidd are you using? >>> >>> From client: >>> >>> received[ 1 ], releasing >>>> received[ 2 ], releasing >>>> received[ 3 ], releasing >>>> received[ 4 ], releasing >>>> received[ 5 ], releasing >>>> received[ 6 ], releasing >>>> received[ 7 ], releasing >>>> received[ 8 ], releasing >>>> received[ 9 ], releasing >>>> >>> >>> and on broker: >>> >>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"1", >>>> message-format=0, settled=false, more=false, rcv-settle-mode=0, >>>> resume=false, aborted=false, batchable=false] (21) >>>> "\x00Sw\xc1\x10\x02\xa1\x04test\xa1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @flow(19) [next-incoming-id=2, incoming-window=2147483647, >>>> next-outgoing-id=0, outgoing-window=2147483647, handle=0, >>>> delivery-count=2, link-credit=500, drain=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=0, >>>> delivery-tag=b"\x00\x00\x00\x00", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @disposition(21) [role=true, first=1, last=1, settled=true, >>>> state=@accepted(36) []] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=0, last=0, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=1, >>>> delivery-tag=b"\x00\x00\x00\x01", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=1, last=1, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=2, >>>> delivery-tag=b"\x00\x00\x00\x02", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=2, last=2, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=3, >>>> delivery-tag=b"\x00\x00\x00\x03", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=3, last=3, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=4, >>>> delivery-tag=b"\x00\x00\x00\x04", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=4, last=4, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=5, >>>> delivery-tag=b"\x00\x00\x00\x05", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=5, last=5, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=6, >>>> delivery-tag=b"\x00\x00\x00\x06", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=6, last=6, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=7, >>>> delivery-tag=b"\x00\x00\x00\x07", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=7, last=7, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=8, >>>> delivery-tag=b"\x00\x00\x00\x08", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 1 <- @disposition(21) [role=true, first=8, last=8, settled=true, >>>> state=@released(38) [], batchable=false] >>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]: >>>> 0 -> @transfer(20) [handle=1, delivery-id=9, >>>> delivery-tag=b"\x00\x00\x00\x09", >>>> message-format=0, settled=false, more=false] (32) >>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa >>>> 1\x07message" >>>> >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: [email protected] >>> For additional commands, e-mail: [email protected] >>> >>> >> Hmm strange, still doesn't work on my side although when I switch to a >> modified outcome it works but only twice before stalling. This is with 0.34 >> so it's very likely something was fixed in interim? >> >> Matt >> >> > Gordon, > It seems that this behavior is specific to LVQs, or at least that's how > I've isolated it in my case here. As you said, when I tried my sample code > against a normal non-durable queue it indeed worked just fine. When I added > the queue as an LVQ I begin to experience the behavior mentioned above: > release doesn't actually re-deliver and modified re-delivers one time > before stalling out. Any ideas why this might be? If you point me to the > relevant code sections I can take a look at the source. > > Matt > > So I did a little bit of investigation and it seems like the issue can be resolved by adding: if (i == messages.end()) i = messages.begin(); at: https://github.com/apache/qpid-cpp/blob/master/src/qpid/broker/MessageMap.cpp#L90 As far as I can tell if a message was requeued into the lvq map then the message position is before the current cursor location, which in turn will bail from the method because it's only looking forward (upper bound on the map). I'm wondering if it's too late to address this issue before the pending 1.35.0 release (obviously after code review), considering it's a single line change and getting this fix into a subsequent release might take quite a while. Matt
