On Fri, Aug 26, 2016 at 9:39 PM, Matt Broadstone <[email protected]> wrote:

> On Fri, Aug 26, 2016 at 7:58 PM, Matt Broadstone <[email protected]>
> wrote:
>
>> On Fri, Aug 26, 2016 at 5:44 PM, Matt Broadstone <[email protected]>
>> wrote:
>>
>>> On Fri, Aug 26, 2016 at 3:33 PM, Gordon Sim <[email protected]> wrote:
>>>
>>>> On 26/08/16 20:15, Matt Broadstone wrote:
>>>>
>>>>> Oops you're sorry I forgot a crucial part there, here's the updated
>>>>> code:
>>>>> const amqp = require('amqp10');
>>>>>
>>>>> let client = new amqp.Client();
>>>>> client.connect('amqp://<address>')
>>>>>   .then(() => Promise.all([
>>>>>     client.createSender('test.queue'),
>>>>>     client.createReceiver('test.queue', {
>>>>>       attach: { receiverSettleMode: 'settle' }
>>>>>     })
>>>>>   ]))
>>>>>   .spread((sender, receiver) => {
>>>>>     let receiveCount = 0;
>>>>>     receiver.on('message', msg => {
>>>>>       receiveCount++;
>>>>>       if (receiveCount === 10) process.exit(0);
>>>>>       console.log('received[', receiveCount, '], releasing');
>>>>>       receiver.release(msg);
>>>>>     });
>>>>>
>>>>>     return sender.send({ test: 'message' });
>>>>>   });
>>>>>
>>>>> With this code I now get the following on qpidd's side:
>>>>>
>>>>
>>>> With that change I see the correct behaviour with qpidd also (see
>>>> below). What version of qpidd are you using?
>>>>
>>>> From client:
>>>>
>>>> received[ 1 ], releasing
>>>>> received[ 2 ], releasing
>>>>> received[ 3 ], releasing
>>>>> received[ 4 ], releasing
>>>>> received[ 5 ], releasing
>>>>> received[ 6 ], releasing
>>>>> received[ 7 ], releasing
>>>>> received[ 8 ], releasing
>>>>> received[ 9 ], releasing
>>>>>
>>>>
>>>> and on broker:
>>>>
>>>> 2016-08-26 20:30:55 [Protocol] trace [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"1",
>>>>> message-format=0, settled=false, more=false, rcv-settle-mode=0,
>>>>> resume=false, aborted=false, batchable=false] (21)
>>>>> "\x00Sw\xc1\x10\x02\xa1\x04test\xa1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @flow(19) [next-incoming-id=2, incoming-window=2147483647,
>>>>> next-outgoing-id=0, outgoing-window=2147483647, handle=0,
>>>>> delivery-count=2, link-credit=500, drain=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=0, 
>>>>> delivery-tag=b"\x00\x00\x00\x00",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @disposition(21) [role=true, first=1, last=1, settled=true,
>>>>> state=@accepted(36) []]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=0, last=0, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=1, 
>>>>> delivery-tag=b"\x00\x00\x00\x01",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=1, last=1, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=2, 
>>>>> delivery-tag=b"\x00\x00\x00\x02",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=2, last=2, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=3, 
>>>>> delivery-tag=b"\x00\x00\x00\x03",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=3, last=3, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=4, 
>>>>> delivery-tag=b"\x00\x00\x00\x04",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=4, last=4, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=5, 
>>>>> delivery-tag=b"\x00\x00\x00\x05",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=5, last=5, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=6, 
>>>>> delivery-tag=b"\x00\x00\x00\x06",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=6, last=6, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=7, 
>>>>> delivery-tag=b"\x00\x00\x00\x07",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=7, last=7, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=8, 
>>>>> delivery-tag=b"\x00\x00\x00\x08",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 1 <- @disposition(21) [role=true, first=8, last=8, settled=true,
>>>>> state=@released(38) [], batchable=false]
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>> 0 -> @transfer(20) [handle=1, delivery-id=9, 
>>>>> delivery-tag=b"\x00\x00\x00\x09",
>>>>> message-format=0, settled=false, more=false] (32)
>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>> 1\x07message"
>>>>>
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: [email protected]
>>>> For additional commands, e-mail: [email protected]
>>>>
>>>>
>>> Hmm strange, still doesn't work on my side although when I switch to a
>>> modified outcome it works but only twice before stalling. This is with 0.34
>>> so it's very likely something was fixed in interim?
>>>
>>> Matt
>>>
>>>
>> Gordon,
>> It seems that this behavior is specific to LVQs, or at least that's how
>> I've isolated it in my case here. As you said, when I tried my sample code
>> against a normal non-durable queue it indeed worked just fine. When I added
>> the queue as an LVQ I begin to experience the behavior mentioned above:
>> release doesn't actually re-deliver and modified re-delivers one time
>> before stalling out.  Any ideas why this might be? If you point me to the
>> relevant code sections I can take a look at the source.
>>
>> Matt
>>
>>
> So I did a little bit of investigation and it seems like the issue can be
> resolved by adding:
>
> if (i == messages.end()) i = messages.begin();
> at: https://github.com/apache/qpid-cpp/blob/master/src/qpid/
> broker/MessageMap.cpp#L90
>
> As far as I can tell if a message was requeued into the lvq map then the
> message position is before the current cursor location, which in turn will
> bail from the method because it's only looking forward (upper bound on the
> map).
>
> I'm wondering if it's too late to address this issue before the pending
> 1.35.0 release (obviously after code review), considering it's a single
> line change and getting this fix into a subsequent release might take quite
> a while.
>
> Matt
>
>
>
As a followup, I think my proposed fix here is a bit of a sledgehammer
since it absolutely resets the cursor position in this particular case. The
standard Queue implementation appears to use a concept of "versioning" to
accommodate this use case, so I think there is room for improvement here.
Obviously you guys would know far better an I would!

Matt

Reply via email to