Re: proton server (azure SB) limit the incoming_window=5000

2017-08-09 Thread Pankaj Bhagra
-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03y{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:47.4000408Z",
"iteration": 467393, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": 7.65389352e-09}, "A3": {"units":
"Double", "value": 101.688866}, "A2": {"units": "Double", "value":
-101.688866}, "A5": {"units": "Double", "value": "... (truncated)

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=20,
delivery-tag=b"\xbe87Z\xf5\xe6\x80B\x82\x0e>\x14\x8fr\xa4\xc9",
message-format=0, more=false, batchable=true] (1257) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x93\x12\xb7\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9)\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03v{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:48.4000176Z",
"iteration": 467394, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": 2.82696688e-09}, "A3": {"units":
"Double", "value": 106.917091}, "A2": {"units": "Double", "value":
-106.917091}, "A5": {"units": "Double", "value": "... (truncated)

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=21,
delivery-tag=b"\x91\x0b\xc3\xe6\x86\xc1\x0aN\xbe\x83\xdfo,\x03b\xdf",
message-format=0, more=false, batchable=true] (856) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x93\x13\xc0\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9*\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xe0\x06\xa1\x1biothub-connection-device-id\xa1\x1aNebbiolo-00012-1.Discovery\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636373159014164350\x00Su\xb0\x00\x00\x01\xf4{"AssetId":
{"type": "FOGOS_DOCKER", "eid": "Nebbiolo-00012-1.Discovery"},
"AssetUpdateHdr": {"timestamp": "2017-08-07T23:40:51.188902-07:00",
"iteration": 41656, "update_type": "AssetAttributes"}, "AssetItems":
{"DiscoverdInfo": {"DiscoveredList": "['krc-32-1', 'krc-32-3', 'krc-32-4',
'krc-22-18']", "DiscoveredCount": 4, "WhiteList": "['192.168.32.1', '
192.168.32.0/8', '192.168.32.3', '192.168.32.4']", "RunningCount": 4,
"WindowsRebootCount": 2}}, "parentId": "88eeb6e55ae548"... (truncated)

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=22,
delivery-tag=b"f\x03R\xe8\x07\x0a\xf5O\xb2W\x1b\x7f\x8c\xe3O\xdc",
message-format=0, more=false, batchable=true] (1260) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x93\x16\xae\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9+\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03y{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:49.4156473Z",
"iteration": 467395, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": 1.40006631e-10}, "A3": {"units":
"Double", "value": 109.827438}, "A2": {"units": "Double", "value":
-109.827438}, "A5": {"units": "Double", "value": "... (truncated)

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=23,
delivery-tag=b"\x08\xb6\xeb\x14 i\x8aA\xa8\xab\xffS\xdd\xd7\x8b\x83",
message-format=0, more=false, batchable=true] (1240) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x93\x1a\xa6\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9,\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03e{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:50.446901Z", "iteration":
467396, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": -2.46100844e-05}, "A3": {"units":
"Double", "value": 110.0}, "A2": {"units": "Double", "value": -110.0},
"A5": {"units": "Double", "value": -90.0}, "A"... (truncated)

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=24,
delivery-tag=b"\xce]\x1fC\xe7\xa7XO\x80\x96/\x1c\x9az\x12\xc6",
message-format=0, more=false, batchable=true] (1242) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x93\x1e\xcc\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9-\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03g{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:51.4468996Z",
"iteration": 467397, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": -0.792964}, "A3": {"units": "Double",
"value": 109.94561}, "A2": {"units": "Double", "value": -109.923813}, "A5":
{"units": "Double", "value": -90.02"... (truncated)

2017-08-09 06:35:07.111 13 12

2017-08-09 06:35:07.111 14 11

2017-08-09 06:35:07.112 15 10

2017-08-09 06:35:07.112 16 9

2017-08-09 06:35:07.113 17 8

2017-08-09 06:35:07.113 18 7

2017-08-09 06:35:07.113 19 6

2017-08-09 06:35:07.114 20 5

2017-08-09 06:35:07.114 21 4

2017-08-09 06:35:07.115 22 3

2017-08-09 06:35:07.115 23 2

2017-08-09 06:35:07.115 24 1

2017-08-09 06:35:07.116 25 0

[0xb58380]:0 -> @flow(19) [next-incoming-id=26, incoming-window=2147483647,
next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=25,
link-credit=, drain=false]

[0xb58380]:0 -> @disposition(21) [role=true, first=12, last=24,
settled=true, state=@accepted(36) []]

[0xb58380]:0 <- @transfer(20) [handle=0, delivery-id=25,
delivery-tag=b"\x98\x1e\xc8\xb6\xd3\xb4bK\x94\xa7\xf7\x9e\xf2\xe2\xeao",
message-format=0, more=false, batchable=true] (1250) "\x00Sp\xc0\x0b\x05@
@p\x05&\\x00@R\x02\x00Sq\xc1\x01\x00\x00Sr\xc1\\x06\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01]\xc0\x93"\xc4\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x009\xc9.\xa3\x12x-opt-locked-until\x83\x00\x00\x01]\xc5\xb4\xa5)\x00St\xc1\xef\x06\xa1\x1biothub-connection-device-id\xa1)S-1-5-21-1936340646-3222415949-3237289474\xa1\x1diothub-connection-auth-method\xa1K{"scope":"hub","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}\xa1$iothub-connection-auth-generation-id\xa1\x12636327363635593852\x00Su\xb0\x00\x00\x03o{"AssetId":
{"type": "KR C", "eid": "S-1-5-21-1936340646-3222415949-3237289474"},
"AssetUpdateHdr": {"timestamp": "2017-08-08T06:40:52.4781421Z",
"iteration": 467398, "update_type": "AssetAttributes"}, "parentId":
"88eeb6e55ae548de97576e16d5289050", "AssetMonitoredItems": {"AXIS_ACT":
{"A1": {"units": "Double", "value": -4.55131578}, "A3": {"units": "Double",
"value": 105.856033}, "A2": {"units": "Double", "value": -104.539864},
"A5": {"units": "Double", "value": -91.31"... (truncated)

On Mon, Aug 7, 2017 at 6:05 PM, Pankaj Bhagra <pans...@gmail.com> wrote:

> I am trying to extract bulk messages from azure SB.
>
> As per their documentation the azure SDK doesn't support bulk read message
> and recommends using the native amqp for the Azure Service Bus. White
> trying to negotiate a session with the azure SB, i noticed that independent
> of what client is requesting, the SB dials down the incoming_window=5000.
> This limits a max of 5000B send per bulk read, thus my consumer runs dry
> till the RTT (which is large for inter cloud) to fetch a new packet.
>
> Is this a restriction of the azure SB or am i not setting any/some of the
> parameters correctly from the client side to achieve the negotiated window
> size  > 5000B.
>
> I am using python proton MessagingHandler Class and clearly see that
> on_message is called for few pkts composing of buffer size ~5000B and then
> have to wait for the RTT delay to get the next batch.
>
> Any suggestion to work around this problem and get larger bulked message ?
> I can't reduce the RTT between the server and consumer. have some
> workaround for parallel consumers but would like to solve the bulk problem
> as that is most efficient way of achieving the high throughput.
>
> class Recv(MessagingHandler):
>
> def __init__(self):
>
> super(Recv, self).__init__(prefetch=100, auto_accept=True,
> auto_settle=True, peer_close_is_error=False)
>
>
> def on_start(self, event):
>
> conn = event.container.connect(connString)
>
> event.container.create_receiver(conn, subscription)
>
>
> def on_message(self, event):
>
> print(event.message.body)
>
> print datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
> self.count, event.receiver.queued
>
>
> [0xace380]:  -> SASL
>
> [0xace380]:  <- SASL
>
> [0xace380]:0 <- @sasl-mechanisms(64) 
> [sasl-server-mechanisms=@PN_SYMBOL[:MSSBCBS,
> :PLAIN, :ANONYMOUS, :EXTERNAL]]
>
> [0xace380]:0 -> @sasl-init(65) [mechanism=:PLAIN, initial-response=b"\
> x00iothubroutes_X\X="]
>
> [0xace380]:0 <- @sasl-outcome(68) [code=0, additional-data=b"Welcome!"]
>
> [0xace380]:  -> AMQP
>
> [0xace380]:0 -> @open(16) 
> [container-id="0ad171ca-cefa-4a27-a7dc-0520e5393fa5",
> hostname="nebhubsb.servicebus.windows.net", channel-max=32767]
>
> [0xace380]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647
> <(214)%20748-3647>, outgoing-window=2147483647 <(214)%20748-3647>]
>
> [0xace380]:0 -> @attach(18) [name="0ad171ca-cefa-4a27-
> a7dc-0520e5393fa5-kukatopic/Subscriptions/kukasub", handle=0, role=true,
> snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> [address="kukatopic/Subscriptions/kukasub", durable=0, timeout=0,
> dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
> initial-delivery-count=0, max-message-size=0]
>
> [0xace380]:0 -> @flow(19) [incoming-window=2147483647 <(214)%20748-3647>,
> next-outgoing-id=0, outgoing-window=2147483647 <(214)%20748-3647>,
> handle=0, delivery-count=0, link-credit=100, drain=false]
>
> [0xace380]:  <- AMQP
>
> [0xace380]:0 <- @open(16) 
> [container-id="b970f07881334c658eb80ff336f2a683_G16",
> max-frame-size=65536, channel-max=4999, idle-time-out=24]
>
> [0xace380]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
> incoming-window=5000, outgoing-window=2147483647 <(214)%20748-3647>,
> handle-max=255]
>
> [0xace380]:0 <- @attach(18) [name="0ad171ca-cefa-4a27-
> a7dc-0520e5393fa5-kukatopic/Subscriptions/kukasub", handle=0, role=false,
> rcv-settle-mode=1, source=@source(40) [address="topic/Subscriptions/sub",
> durable=0, timeout=0, dynamic=false], target=@target(41) [durable=0,
> timeout=0, dynamic=false], initial-delivery-count=0,
> max-message-size=266240]
>
>
>
>


proton server (azure SB) limit the incoming_window=5000

2017-08-07 Thread Pankaj Bhagra
I am trying to extract bulk messages from azure SB.

As per their documentation the azure SDK doesn't support bulk read message
and recommends using the native amqp for the Azure Service Bus. White
trying to negotiate a session with the azure SB, i noticed that independent
of what client is requesting, the SB dials down the incoming_window=5000.
This limits a max of 5000B send per bulk read, thus my consumer runs dry
till the RTT (which is large for inter cloud) to fetch a new packet.

Is this a restriction of the azure SB or am i not setting any/some of the
parameters correctly from the client side to achieve the negotiated window
size  > 5000B.

I am using python proton MessagingHandler Class and clearly see that
on_message is called for few pkts composing of buffer size ~5000B and then
have to wait for the RTT delay to get the next batch.

Any suggestion to work around this problem and get larger bulked message ?
I can't reduce the RTT between the server and consumer. have some
workaround for parallel consumers but would like to solve the bulk problem
as that is most efficient way of achieving the high throughput.

class Recv(MessagingHandler):

def __init__(self):

super(Recv, self).__init__(prefetch=100, auto_accept=True,
auto_settle=True, peer_close_is_error=False)


def on_start(self, event):

conn = event.container.connect(connString)

event.container.create_receiver(conn, subscription)


def on_message(self, event):

print(event.message.body)

print datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
self.count, event.receiver.queued


[0xace380]:  -> SASL

[0xace380]:  <- SASL

[0xace380]:0 <- @sasl-mechanisms(64)
[sasl-server-mechanisms=@PN_SYMBOL[:MSSBCBS, :PLAIN, :ANONYMOUS, :EXTERNAL]]

[0xace380]:0 -> @sasl-init(65) [mechanism=:PLAIN,
initial-response=b"\x00iothubroutes_X\X="]

[0xace380]:0 <- @sasl-outcome(68) [code=0, additional-data=b"Welcome!"]

[0xace380]:  -> AMQP

[0xace380]:0 -> @open(16)
[container-id="0ad171ca-cefa-4a27-a7dc-0520e5393fa5", hostname="
nebhubsb.servicebus.windows.net", channel-max=32767]

[0xace380]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647,
outgoing-window=2147483647]

[0xace380]:0 -> @attach(18)
[name="0ad171ca-cefa-4a27-a7dc-0520e5393fa5-kukatopic/Subscriptions/kukasub",
handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0,
source=@source(40) [address="kukatopic/Subscriptions/kukasub", durable=0,
timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0,
dynamic=false], initial-delivery-count=0, max-message-size=0]

[0xace380]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=100,
drain=false]

[0xace380]:  <- AMQP

[0xace380]:0 <- @open(16)
[container-id="b970f07881334c658eb80ff336f2a683_G16", max-frame-size=65536,
channel-max=4999, idle-time-out=24]

[0xace380]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=1,
incoming-window=5000, outgoing-window=2147483647, handle-max=255]

[0xace380]:0 <- @attach(18)
[name="0ad171ca-cefa-4a27-a7dc-0520e5393fa5-kukatopic/Subscriptions/kukasub",
handle=0, role=false, rcv-settle-mode=1, source=@source(40)
[address="topic/Subscriptions/sub", durable=0, timeout=0, dynamic=false],
target=@target(41) [durable=0, timeout=0, dynamic=false],
initial-delivery-count=0, max-message-size=266240]