On 3/14/24 19:28, Ciaran wrote:


I've provided the full set of frames below, but if I'm interpreting
correctly the inputSequence property is not being incremented on receipt
of
data which means that the deadline for hearing from the peer again is not
being suitably increased. (I can see the sequence increments and the
localdeadline is recalculated for the empty frames the peer sends, but
not
for when there's actual data.)
I've gone back through the client and engine code and did some testing
and cannot reproduce any issue where the sequence tracking is not
updated when incoming frames are read regardless of which frame they are.

I fixed a minor issue where the Client is not configuring the idle
timeout to half the set value as it says it will.

Hi Tim,
Thank you for taking a look, but I'm still seeing the issue :( I've
cherry-picked the idle timeout commit
on top of M19 and it /may/ be a little better, but that still feels like
it's covering up an issue.

Forgive me, but I'm not deeply familiar with the protonj2 codebase, but it
seems logical to
me that the inputSequence is incremented whenever some data is received
from the peer.

This should be fixed no in:

https://issues.apache.org/jira/browse/PROTON-2807

This essentially occurred because the Proton buffers were rewritten to allow zero copy semantics after the ingest code was written and the read offset is no longer reflective of the read outcome as the buffer may have handed off its underlying payload and have a size of zero which means the read offset will still be zero.  This code was written in early days and had some safety checks in it that are no longer needed given other mechanics in place in the engine so has been updated to treat an non-empty buffer as cause to update the input sequence value.  It's necessary to reproduce and explain the issues before making these changes so I needed to find a test case for this which I now have.


I've attached another trace with additional logging to show the state of
ProtonEngine#ingest()
function. As I mentioned earlier, for Transfer frames the readOffset of the
input buffer
after fireRead has completed is 0, which the logic treats as a 'no
sequence' operation.

The interesting part, you may see below is that for each transfer there are
multiple (two)
calls to ingest, both returning 0 as the post fireRead value of the
readOffset. I presume this
is due to buffering and needing to fetch more data for this particular
frame type, so I can get my
head around the first of these reads being ignored as a sequence increment,
but it still feels to me that
the second (or presumably the very last one in the case of larger payloads)
should return a non-zero
value to trigger the sequence increment?

I'm not quite sure what the value of enforcing this logical sequence is as
it only appears to
be used as a 'liveness' indicator for the remote connection, does it matter
if it was a valid frame
returned?

Trace with additional logging of the variable state below:

<- AMQP:[1619358550:0] Open{
containerId='91791f32464449109b4136667ccaa3f7_G52', hostname='null',
maxFrameSize=65536, channelMax=4999, idleTimeOut=120000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
startIndex:0, input.getReadOffset():71
inputSequence++
<- AMQP:[1619358550:0] Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=5000, outgoingWindow=1600, handleMax=255,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
startIndex:0, input.getReadOffset():34
inputSequence++
<- AMQP:[1619358550:0]
Attach{name='receiver-ID:ecec0184-d4d3-4fe8-b8ec-0b132b249368:1:2:1:1',
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='testq', durable=NONE, expiryPolicy=LINK_DETACH,
timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=null, filter=null,
defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=false,
messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list,
amqp:released:list, amqp:modified:list], capabilities=null},
target=Target{address='testq', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=null, initialDeliveryCount=0,
maxMessageSize=18446744073709551615, offeredCapabilities=[SHARED-SUBS],
desiredCapabilities=null, properties=null}
startIndex:0, input.getReadOffset():258
inputSequence++
startIndex:0, input.getReadOffset():0
<- AMQP:[1619358550:0] Transfer{handle=0, deliveryId=0,
deliveryTag=DeliveryTag: {[56, 109, 70, 77, -127, -59, 8, 72, -120, 61, 85,
-116, 30, -125, -121, -112]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0a\x05@@pH\x19\x08\x00@C
\x00Sq\xc1$\x02\xa3\x10x-opt"...(truncated)
startIndex:0, input.getReadOffset():0
-> AMQP:[1619358550:0] Disposition{role=RECEIVER, first=0, last=0,
settled=true, state=Accepted{}, batchable=false}

// This should be sequence 8, but the idle process hasn't started yet to
log the previouls 1->7
60000,49840930,49790947,0,7
60000,49850947,49800960,7,7
60000,49850947,49810971,7,7
60000,49850947,49820983,7,7
60000,49850947,49831000,7,7
<- AMQP:[1619358550:0] null
startIndex:0, input.getReadOffset():8
inputSequence++
60000,49850947,49840986,7,8
startIndex:0, input.getReadOffset():0
<- AMQP:[1619358550:0] Transfer{handle=0, deliveryId=1,
deliveryTag=DeliveryTag: {[49, 19, -96, -3, -69, 76, -17, 73, -78, 78, 87,
-118, 63, -16, -104, 36]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0a\x05@@pH\x19\x08\x00@C
\x00Sq\xc1$\x02\xa3\x10x-opt"...(truncated)
startIndex:0, input.getReadOffset():0
-> AMQP:[1619358550:0] Disposition{role=RECEIVER, first=1, last=0,
settled=true, state=Accepted{}, batchable=false}

// This should be sequence 9, but the transfer didn't increment it
60000,49900986,49845977,8,8
60000,49900986,49855993,8,8

// READ #1 for the Transfer, state of variables after fireEvent returns :
startIndex:0, input.getReadOffset():0
<- AMQP:[1619358550:0] Transfer{handle=0, deliveryId=2,
deliveryTag=DeliveryTag: {[63, 125, -121, 13, -45, -3, 7, 74, -108, 83, -1,
-72, -37, 61, -33, 123]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0a\x05@@pH\x19\x08\x00@C
\x00Sq\xc1$\x02\xa3\x10x-opt"...(truncated)

// READ #2 for the Transfer, state of variables after fireEvent returns :
startIndex:0, input.getReadOffset():0
-> AMQP:[1619358550:0] Disposition{role=RECEIVER, first=2, last=0,
settled=true, state=Accepted{}, batchable=false}

// This should be sequence 10, but the transfer didn't increment it
60000,49900986,49866005,8,8
60000,49900986,49876016,8,8
60000,49900986,49886028,8,8
60000,49900986,49893521,8,8
60000,49900986,49897264,8,8
60000,49900986,49899135,8,8
60000,49900986,49900147,8,8
60000,49900986,49901159,8,8
-> AMQP:[1619358550:0]
Close{error=Error{condition=amqp:resource-limit-exceeded,
description='local-idle-timeout expired', info=null}}


--
Tim Bish


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to