I've attached another trace with additional logging to show the state of
ProtonEngine#ingest()
function. As I mentioned earlier, for Transfer frames the readOffset of the
input buffer
after fireRead has completed is 0, which the logic treats as a 'no
sequence' operation.
The interesting part, you may see below is that for each transfer there are
multiple (two)
calls to ingest, both returning 0 as the post fireRead value of the
readOffset. I presume this
is due to buffering and needing to fetch more data for this particular
frame type, so I can get my
head around the first of these reads being ignored as a sequence increment,
but it still feels to me that
the second (or presumably the very last one in the case of larger payloads)
should return a non-zero
value to trigger the sequence increment?
I'm not quite sure what the value of enforcing this logical sequence is as
it only appears to
be used as a 'liveness' indicator for the remote connection, does it matter
if it was a valid frame
returned?
Trace with additional logging of the variable state below:
<- AMQP:[1619358550:0] Open{
containerId='91791f32464449109b4136667ccaa3f7_G52', hostname='null',
maxFrameSize=65536, channelMax=4999, idleTimeOut=120000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
startIndex:0, input.getReadOffset():71
inputSequence++
<- AMQP:[1619358550:0] Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=5000, outgoingWindow=1600, handleMax=255,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
startIndex:0, input.getReadOffset():34
inputSequence++
<- AMQP:[1619358550:0]
Attach{name='receiver-ID:ecec0184-d4d3-4fe8-b8ec-0b132b249368:1:2:1:1',
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='testq', durable=NONE, expiryPolicy=LINK_DETACH,
timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=null, filter=null,
defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=false,
messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list,
amqp:released:list, amqp:modified:list], capabilities=null},
target=Target{address='testq', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=null, initialDeliveryCount=0,
maxMessageSize=18446744073709551615, offeredCapabilities=[SHARED-SUBS],
desiredCapabilities=null, properties=null}
startIndex:0, input.getReadOffset():258
inputSequence++
startIndex:0, input.getReadOffset():0
<- AMQP:[1619358550:0] Transfer{handle=0, deliveryId=0,
deliveryTag=DeliveryTag: {[56, 109, 70, 77, -127, -59, 8, 72, -120, 61, 85,
-116, 30, -125, -121, -112]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0a\x05@@pH\x19\x08\x00@C
\x00Sq\xc1$\x02\xa3\x10x-opt"...(truncated)
startIndex:0, input.getReadOffset():0
-> AMQP:[1619358550:0] Disposition{role=RECEIVER, first=0, last=0,
settled=true, state=Accepted{}, batchable=false}
// This should be sequence 8, but the idle process hasn't started yet to
log the previouls 1->7
60000,49840930,49790947,0,7
60000,49850947,49800960,7,7
60000,49850947,49810971,7,7
60000,49850947,49820983,7,7
60000,49850947,49831000,7,7
<- AMQP:[1619358550:0] null
startIndex:0, input.getReadOffset():8
inputSequence++
60000,49850947,49840986,7,8
startIndex:0, input.getReadOffset():0
<- AMQP:[1619358550:0] Transfer{handle=0, deliveryId=1,
deliveryTag=DeliveryTag: {[49, 19, -96, -3, -69, 76, -17, 73, -78, 78, 87,
-118, 63, -16, -104, 36]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0a\x05@@pH\x19\x08\x00@C
\x00Sq\xc1$\x02\xa3\x10x-opt"...(truncated)
startIndex:0, input.getReadOffset():0
-> AMQP:[1619358550:0] Disposition{role=RECEIVER, first=1, last=0,
settled=true, state=Accepted{}, batchable=false}
// This should be sequence 9, but the transfer didn't increment it
60000,49900986,49845977,8,8
60000,49900986,49855993,8,8
// READ #1 for the Transfer, state of variables after fireEvent returns :
startIndex:0, input.getReadOffset():0
<- AMQP:[1619358550:0] Transfer{handle=0, deliveryId=2,
deliveryTag=DeliveryTag: {[63, 125, -121, 13, -45, -3, 7, 74, -108, 83, -1,
-72, -37, 61, -33, 123]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0a\x05@@pH\x19\x08\x00@C
\x00Sq\xc1$\x02\xa3\x10x-opt"...(truncated)
// READ #2 for the Transfer, state of variables after fireEvent returns :
startIndex:0, input.getReadOffset():0
-> AMQP:[1619358550:0] Disposition{role=RECEIVER, first=2, last=0,
settled=true, state=Accepted{}, batchable=false}
// This should be sequence 10, but the transfer didn't increment it
60000,49900986,49866005,8,8
60000,49900986,49876016,8,8
60000,49900986,49886028,8,8
60000,49900986,49893521,8,8
60000,49900986,49897264,8,8
60000,49900986,49899135,8,8
60000,49900986,49900147,8,8
60000,49900986,49901159,8,8
-> AMQP:[1619358550:0]
Close{error=Error{condition=amqp:resource-limit-exceeded,
description='local-idle-timeout expired', info=null}}