Re: proton server (azure SB) limit the incoming_window=5000

2017-08-09 Thread Pankaj Bhagra
Thanks Gordon for looking into my query. It makes sense what u said,
however i am still searching for a reason of flow control and limited batch
size.

As per your suggestion i tried increasing the link-credit to 10k, 100k, but
that doesn't change much. my understanding of prefetch was that its number
of packets not the number of bytes (i confirmed this by reducing the
prefetch to 2 and then i see only 1 pkt per bulk message (half of the
window size)).

The size of the each pkt is roughly 900B, and as u can see that i am not
able to read more than 12 pkts per batch in the complete logs below. So
looking back yes the size of 12x900B is greater than 5KB, so the heading
may need correction - it looks like 2x of that which is 10kb.

would appreciate if someone can suggest some more knobs i should play to
figure out where this limit of 10kb is coming from ? client advertised
incoming_window is incoming-window=2147483647 <(214)%20748-3647>

see the complete logs, as u can see the bulk size is limited to 12 pkts,
where the link credit remains at , after every bulk.


[0xb58380]:  -> SASL

[0xb58380]:  <- SASL

[0xb58380]:0 <- @sasl-mechanisms(64)
[sasl-server-mechanisms=@PN_SYMBOL[:MSSBCBS, :PLAIN, :ANONYMOUS, :EXTERNAL]]

[0xb58380]:0 -> @sasl-init(65) [mechanism=:PLAIN,
initial-response=b"\x0"]

[0xb58380]:0 <- @sasl-outcome(68) [code=0, additional-data=b"Welcome!"]

[0xb58380]:  -> AMQP

[0xb58380]:0 -> @open(16)
[container-id="bc599ddc-74df-46b0-800c-401aed27f321", hostname="
nebhubsb.servicebus.windows.net", channel-max=32767]

[0xb58380]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647,
outgoing-window=2147483647]

[0xb58380]:0 -> @attach(18)
[name="bc599ddc-74df-46b0-800c-401aed27f321-kukatopic/Subscriptions/kukasub",
handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0,
source=@source(40) [address="kukatopic/Subscriptions/kukasub", durable=0,
timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0,
dynamic=false], initial-delivery-count=0, max-message-size=0]

[0xb58380]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=1,
drain=false]

[0xb58380]:  <- AMQP

[0xb58380]:0 <- @open(16)
[container-id="fa8f5d5577be485ebd7f5ebdbdfd9ca1_G13", max-frame-size=65536,
channel-max=4999, idle-time-out=24]

[0xb58380]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
incoming-window=5000, outgoing-window=2147483647, handle-max=255]

[0xb58380]:0 <- @attach(18)
[name="bc599ddc-74df-46b0-800c-401aed27f321-kukatopic/Subscriptions/kukasub",
handle=0, role=false, rcv-settle-mode=1, source=@source(40)
[address="kukatopic/Subscriptions/kukasub", durable=0, timeout=0,
dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
initial-delivery-count=0, max-message-size=266240]

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=0,
delivery-tag=b"\x06\xa8\xb3\xf9\x94\x1b\x09H\xa0\xc7\xf2\x11\xd7I\xa9\x12",
message-format=0, more=false, batchable=true] (1250) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x92\xc7\x0a\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9\x15\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03o{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:29.4663961Z",
"iteration": 467375, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": 7.35237932}, "A3": {"units": "Double",
"value": 97.599472}, "A2": {"units": "Double", "value": -88.2291565}, "A5":
{"units": "Double", "value": 80"... (truncated)

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=1,
delivery-tag=b"P\xb1\x94N9\x18\x0bD\xb6\x00[cr#\x1d\xab", message-format=0,
more=false, batchable=true] (1245)
"\x00Sp\xc0\x0b\x05@@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x92\xcb\x04\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9\x16\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03j{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:30.4976526Z",
"iteration": 467376, "update_type": 

proton server (azure SB) limit the incoming_window=5000

2017-08-07 Thread Pankaj Bhagra
I am trying to extract bulk messages from azure SB.

As per their documentation the azure SDK doesn't support bulk read message
and recommends using the native amqp for the Azure Service Bus. White
trying to negotiate a session with the azure SB, i noticed that independent
of what client is requesting, the SB dials down the incoming_window=5000.
This limits a max of 5000B send per bulk read, thus my consumer runs dry
till the RTT (which is large for inter cloud) to fetch a new packet.

Is this a restriction of the azure SB or am i not setting any/some of the
parameters correctly from the client side to achieve the negotiated window
size  > 5000B.

I am using python proton MessagingHandler Class and clearly see that
on_message is called for few pkts composing of buffer size ~5000B and then
have to wait for the RTT delay to get the next batch.

Any suggestion to work around this problem and get larger bulked message ?
I can't reduce the RTT between the server and consumer. have some
workaround for parallel consumers but would like to solve the bulk problem
as that is most efficient way of achieving the high throughput.

class Recv(MessagingHandler):

def __init__(self):

super(Recv, self).__init__(prefetch=100, auto_accept=True,
auto_settle=True, peer_close_is_error=False)


def on_start(self, event):

conn = event.container.connect(connString)

event.container.create_receiver(conn, subscription)


def on_message(self, event):

print(event.message.body)

print datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
self.count, event.receiver.queued


[0xace380]:  -> SASL

[0xace380]:  <- SASL

[0xace380]:0 <- @sasl-mechanisms(64)
[sasl-server-mechanisms=@PN_SYMBOL[:MSSBCBS, :PLAIN, :ANONYMOUS, :EXTERNAL]]

[0xace380]:0 -> @sasl-init(65) [mechanism=:PLAIN,
initial-response=b"\x00iothubroutes_X\X="]

[0xace380]:0 <- @sasl-outcome(68) [code=0, additional-data=b"Welcome!"]

[0xace380]:  -> AMQP

[0xace380]:0 -> @open(16)
[container-id="0ad171ca-cefa-4a27-a7dc-0520e5393fa5", hostname="
nebhubsb.servicebus.windows.net", channel-max=32767]

[0xace380]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647,
outgoing-window=2147483647]

[0xace380]:0 -> @attach(18)
[name="0ad171ca-cefa-4a27-a7dc-0520e5393fa5-kukatopic/Subscriptions/kukasub",
handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0,
source=@source(40) [address="kukatopic/Subscriptions/kukasub", durable=0,
timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0,
dynamic=false], initial-delivery-count=0, max-message-size=0]

[0xace380]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=100,
drain=false]

[0xace380]:  <- AMQP

[0xace380]:0 <- @open(16)
[container-id="b970f07881334c658eb80ff336f2a683_G16", max-frame-size=65536,
channel-max=4999, idle-time-out=24]

[0xace380]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
incoming-window=5000, outgoing-window=2147483647, handle-max=255]

[0xace380]:0 <- @attach(18)
[name="0ad171ca-cefa-4a27-a7dc-0520e5393fa5-kukatopic/Subscriptions/kukasub",
handle=0, role=false, rcv-settle-mode=1, source=@source(40)
[address="topic/Subscriptions/sub", durable=0, timeout=0, dynamic=false],
target=@target(41) [durable=0, timeout=0, dynamic=false],
initial-delivery-count=0, max-message-size=266240]