On Fri, Aug 26, 2016 at 9:45 PM, Matt Broadstone <[email protected]> wrote:

> On Fri, Aug 26, 2016 at 9:39 PM, Matt Broadstone <[email protected]>
> wrote:
>
>> On Fri, Aug 26, 2016 at 7:58 PM, Matt Broadstone <[email protected]>
>> wrote:
>>
>>> On Fri, Aug 26, 2016 at 5:44 PM, Matt Broadstone <[email protected]>
>>> wrote:
>>>
>>>> On Fri, Aug 26, 2016 at 3:33 PM, Gordon Sim <[email protected]> wrote:
>>>>
>>>>> On 26/08/16 20:15, Matt Broadstone wrote:
>>>>>
>>>>>> Oops you're sorry I forgot a crucial part there, here's the updated
>>>>>> code:
>>>>>> const amqp = require('amqp10');
>>>>>>
>>>>>> let client = new amqp.Client();
>>>>>> client.connect('amqp://<address>')
>>>>>>   .then(() => Promise.all([
>>>>>>     client.createSender('test.queue'),
>>>>>>     client.createReceiver('test.queue', {
>>>>>>       attach: { receiverSettleMode: 'settle' }
>>>>>>     })
>>>>>>   ]))
>>>>>>   .spread((sender, receiver) => {
>>>>>>     let receiveCount = 0;
>>>>>>     receiver.on('message', msg => {
>>>>>>       receiveCount++;
>>>>>>       if (receiveCount === 10) process.exit(0);
>>>>>>       console.log('received[', receiveCount, '], releasing');
>>>>>>       receiver.release(msg);
>>>>>>     });
>>>>>>
>>>>>>     return sender.send({ test: 'message' });
>>>>>>   });
>>>>>>
>>>>>> With this code I now get the following on qpidd's side:
>>>>>>
>>>>>
>>>>> With that change I see the correct behaviour with qpidd also (see
>>>>> below). What version of qpidd are you using?
>>>>>
>>>>> From client:
>>>>>
>>>>> received[ 1 ], releasing
>>>>>> received[ 2 ], releasing
>>>>>> received[ 3 ], releasing
>>>>>> received[ 4 ], releasing
>>>>>> received[ 5 ], releasing
>>>>>> received[ 6 ], releasing
>>>>>> received[ 7 ], releasing
>>>>>> received[ 8 ], releasing
>>>>>> received[ 9 ], releasing
>>>>>>
>>>>>
>>>>> and on broker:
>>>>>
>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"1",
>>>>>> message-format=0, settled=false, more=false, rcv-settle-mode=0,
>>>>>> resume=false, aborted=false, batchable=false] (21)
>>>>>> "\x00Sw\xc1\x10\x02\xa1\x04test\xa1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @flow(19) [next-incoming-id=2, incoming-window=2147483647,
>>>>>> next-outgoing-id=0, outgoing-window=2147483647, handle=0,
>>>>>> delivery-count=2, link-credit=500, drain=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=0, 
>>>>>> delivery-tag=b"\x00\x00\x00\x00",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @disposition(21) [role=true, first=1, last=1, settled=true,
>>>>>> state=@accepted(36) []]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=0, last=0, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=1, 
>>>>>> delivery-tag=b"\x00\x00\x00\x01",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=1, last=1, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=2, 
>>>>>> delivery-tag=b"\x00\x00\x00\x02",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=2, last=2, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=3, 
>>>>>> delivery-tag=b"\x00\x00\x00\x03",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=3, last=3, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=4, 
>>>>>> delivery-tag=b"\x00\x00\x00\x04",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=4, last=4, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=5, 
>>>>>> delivery-tag=b"\x00\x00\x00\x05",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=5, last=5, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=6, 
>>>>>> delivery-tag=b"\x00\x00\x00\x06",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=6, last=6, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=7, 
>>>>>> delivery-tag=b"\x00\x00\x00\x07",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=7, last=7, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=8, 
>>>>>> delivery-tag=b"\x00\x00\x00\x08",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 1 <- @disposition(21) [role=true, first=8, last=8, settled=true,
>>>>>> state=@released(38) [], batchable=false]
>>>>>> 2016-08-26 20:30:55 [Protocol] trace 
>>>>>> [qpid.127.0.0.1:5672-127.0.0.1:48756]:
>>>>>> 0 -> @transfer(20) [handle=1, delivery-id=9, 
>>>>>> delivery-tag=b"\x00\x00\x00\x09",
>>>>>> message-format=0, settled=false, more=false] (32)
>>>>>> "\x00Sp\xc0\x06\x04BP\x04@A\x00Sw\xc1\x10\x02\xa1\x04test\xa
>>>>>> 1\x07message"
>>>>>>
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: [email protected]
>>>>> For additional commands, e-mail: [email protected]
>>>>>
>>>>>
>>>> Hmm strange, still doesn't work on my side although when I switch to a
>>>> modified outcome it works but only twice before stalling. This is with 0.34
>>>> so it's very likely something was fixed in interim?
>>>>
>>>> Matt
>>>>
>>>>
>>> Gordon,
>>> It seems that this behavior is specific to LVQs, or at least that's how
>>> I've isolated it in my case here. As you said, when I tried my sample code
>>> against a normal non-durable queue it indeed worked just fine. When I added
>>> the queue as an LVQ I begin to experience the behavior mentioned above:
>>> release doesn't actually re-deliver and modified re-delivers one time
>>> before stalling out.  Any ideas why this might be? If you point me to the
>>> relevant code sections I can take a look at the source.
>>>
>>> Matt
>>>
>>>
>> So I did a little bit of investigation and it seems like the issue can be
>> resolved by adding:
>>
>> if (i == messages.end()) i = messages.begin();
>> at: https://github.com/apache/qpid-cpp/blob/master/src/qpid/brok
>> er/MessageMap.cpp#L90
>>
>> As far as I can tell if a message was requeued into the lvq map then the
>> message position is before the current cursor location, which in turn will
>> bail from the method because it's only looking forward (upper bound on the
>> map).
>>
>> I'm wondering if it's too late to address this issue before the pending
>> 1.35.0 release (obviously after code review), considering it's a single
>> line change and getting this fix into a subsequent release might take quite
>> a while.
>>
>> Matt
>>
>>
>>
> As a followup, I think my proposed fix here is a bit of a sledgehammer
> since it absolutely resets the cursor position in this particular case. The
> standard Queue implementation appears to use a concept of "versioning" to
> accommodate this use case, so I think there is room for improvement here.
> Obviously you guys would know far better an I would!
>
> Matt
>
>
>

Reported: https://issues.apache.org/jira/browse/QPID-7407

Matt

Reply via email to