Hi,

I am confused with flow to disk context, when direct memory reaches flow to
disk threshold, broker directly write to disk or it keep in both memory and
disk? i was in the impression that flow to disk threshold to free up direct
memory so that broker wont crash, isn't it?

So i have 1.5gb direct memory and here is my flow to disk threshodl

"broker.flowToDiskThreshold":"644245094"  (40% as default)

I am pushing messages and after 40% of direct memory messages are writing
to disk as you can see disk space is going up but my question is when its
writing to disk shouldn't it free up direct memory? but i see direct memory
usage is also going up, am i missing any thing here?


broker1 | success | rc=0 >>
/data   50G  754M   46G   2% /ebs
Fri Oct 14 17:59:25 UTC 2016
  "maximumDirectMemorySize" : 1610612736,
    "usedDirectMemorySize" : 840089280,

broker1 | success | rc=0 >>
/data   50G  761M   46G   2% /ebs
Fri Oct 14 17:59:27 UTC 2016
  "maximumDirectMemorySize" : 1610612736,
    "usedDirectMemorySize" : 843497152,

.
.
.
/data   50G  1.3G   46G   3% /ebs
Fri Oct 14 18:09:08 UTC 2016
  "maximumDirectMemorySize" : 1610612736,
    "usedDirectMemorySize" : 889035136,


Please help me understand this!

Thanks,
Ram



On Fri, Oct 14, 2016 at 9:22 AM, rammohan ganapavarapu <
rammohanga...@gmail.com> wrote:

> So i ran the test few more times and it is happening every time, i was
> monitoring direct memory usage and looks like it ran out of direct memory.
>
>   "maximumDirectMemorySize" : 2415919104,
>     "usedDirectMemorySize" : 2414720896,
>
> Any thoughts guys?
>
> Ram
>
> On Thu, Oct 13, 2016 at 4:37 PM, rammohan ganapavarapu <
> rammohanga...@gmail.com> wrote:
>
>> Guys,
>>
>> Not sure what i am doing wrong, i have set heap to 1gb and direct mem to
>> 2gb after ~150k msgs queuedepth  in the queue i am getting bellow error and
>> broker is getting killed. Any suggestions?
>>
>> 2016-10-13 23:27:41,894 ERROR [IO-/10.16.1.34:46096] (o.a.q.s.Main) -
>> Uncaught exception, shutting down.
>> java.lang.OutOfMemoryError: Direct buffer memory
>>         at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_75]
>>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> ~[na:1.7.0_75]
>>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> ~[na:1.7.0_75]
>>
>> 2016-10-13 23:27:41,894 ERROR [IO-/10.16.1.34:46096] (o.a.q.s.Main) -
>> Uncaught exception, shutting down.
>> java.lang.OutOfMemoryError: Direct buffer memory
>>         at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_75]
>>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> ~[na:1.7.0_75]
>>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> ~[na:1.7.0_75]
>>         at 
>> org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:469)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:482)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.protocol.v0_10.ServerEncoder.init(ServerEncoder.java:57)
>> ~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
>> method(ServerDisassembler.java:196) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
>> control(ServerDisassembler.java:185) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
>> control(ServerDisassembler.java:57) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Method.delegate(Method.java:159)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
>> send(ServerDisassembler.java:79) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Connection.send(Connection.java:415)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.protocol.v0_10.ServerConnection.send(ServerConnection.java:497)
>> ~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Session.send(Session.java:588)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Session.invoke(Session.java:804)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Session.invoke(Session.java:613)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.SessionInvoker.sessionCompleted(SessionInvoker.java:65)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Session.flushProcessed(Session.java:514)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.
>> command(ServerSessionDelegate.java:119) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.
>> command(ServerSessionDelegate.java:87) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Method.delegate(Method.java:155)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Session.received(Session.java:582)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Connection.dispatch(Connection.java:447)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:65)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:41)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.MethodDelegate.executionSync(MethodDelegate.java:104)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.ExecutionSync.dispatch(ExecutionSync.java:82)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.ConnectionDelegate.command(ConnectionDelegate.java:55)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.transport.ConnectionDelegate.command(ConnectionDelegate.java:41)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Method.delegate(Method.java:155)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.transport.Connection.received(Connection.java:400)
>> ~[qpid-common-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerConnection.
>> access$001(ServerConnection.java:72) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.protocol.v0_10.ServerConnection$2.run(ServerConnection.java:277)
>> ~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.protocol.v0_10.ServerConnection$2.run(ServerConnection.java:273)
>> ~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
>>         at java.security.AccessController.doPrivileged(Native Method)
>> ~[na:1.7.0_75]
>>         at org.apache.qpid.server.protocol.v0_10.ServerConnection.
>> received(ServerConnection.java:272) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.protocol.v0_10.ServerAssembler.emit(ServerAssembler.java:122)
>> ~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerAssembler.
>> assemble(ServerAssembler.java:211) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.protocol.v0_10.ServerAssembler.frame(ServerAssembler.java:151)
>> ~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerAssembler.
>> received(ServerAssembler.java:79) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerInputHandler.
>> parse(ServerInputHandler.java:175) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.ServerInputHandler.
>> received(ServerInputHandler.java:82) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.protocol.v0_10.AMQPConnection_0_10$3.
>> run(AMQPConnection_0_10.java:156) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at java.security.AccessController.doPrivileged(Native Method)
>> ~[na:1.7.0_75]
>>         at org.apache.qpid.server.protocol.v0_10.AMQPConnection_0_10.
>> received(AMQPConnection_0_10.java:148) ~[qpid-broker-plugins-amqp-0-1
>> 0-protocol-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.transport.MultiVersionProtocolEngine.
>> received(MultiVersionProtocolEngine.java:144)
>> ~[qpid-broker-core-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.transport.NonBlockingConnection.proce
>> ssAmqpData(NonBlockingConnection.java:609) ~[qpid-broker-core-6.0.2.jar:6
>> .0.2]
>>         at org.apache.qpid.server.transport.NonBlockingConnectionPlainD
>> elegate.processData(NonBlockingConnectionPlainDelegate.java:58)
>> ~[qpid-broker-core-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.transport.NonBlockingConnection.
>> doRead(NonBlockingConnection.java:503) ~[qpid-broker-core-6.0.2.jar:6
>> .0.2]
>>         at org.apache.qpid.server.transport.NonBlockingConnection.
>> doWork(NonBlockingConnection.java:282) ~[qpid-broker-core-6.0.2.jar:6
>> .0.2]
>>         at org.apache.qpid.server.transport.NetworkConnectionScheduler.
>> processConnection(NetworkConnectionScheduler.java:124)
>> ~[qpid-broker-core-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.transport.SelectorThread$ConnectionPr
>> ocessor.processConnection(SelectorThread.java:504)
>> ~[qpid-broker-core-6.0.2.jar:6.0.2]
>>         at org.apache.qpid.server.transport.SelectorThread$SelectionTas
>> k.performSelect(SelectorThread.java:337) ~[qpid-broker-core-6.0.2.jar:6
>> .0.2]
>>         at 
>> org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:87)
>> ~[qpid-broker-core-6.0.2.jar:6.0.2]
>>         at 
>> org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:462)
>> ~[qpid-broker-core-6.0.2.jar:6.0.2]
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> ~[na:1.7.0_75]
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> ~[na:1.7.0_75]
>>         at java.lang.Thread.run(Thread.java:745) ~[na:1.7.0_75]
>>
>>
>> Ram
>>
>> On Thu, Oct 13, 2016 at 1:45 PM, Rob Godfrey <rob.j.godf...@gmail.com>
>> wrote:
>>
>>> On 13 October 2016 at 19:54, rammohan ganapavarapu <
>>> rammohanga...@gmail.com>
>>> wrote:
>>>
>>> > Rob,
>>> >
>>> > Understood, we are doing negative testing like what will happened to
>>> broker
>>> > when all the consumers are down but producers are pumping messages, so
>>> i
>>> > was the in the impression that flow to disk threshold will avoid
>>> broker go
>>> > bad because of OOM. So i have bumped up the heap and direct mem
>>> setting of
>>> > broker and try to restart  but it was complaining with bellow error.
>>> >
>>> >
>>> >
>>> > *2016-10-13 18:28:46,157 INFO  [Housekeeping[default]]
>>> > (q.m.q.flow_to_disk_active) - [Housekeeping[default]]
>>> > [vh(/default)/qu(ax-q-mxgroup001)] QUE-1014 : Message flow to disk
>>> active
>>> > :  Message memory use 13124325 kB (13gb) exceeds threshold 168659 kB
>>> > (168mb)*
>>> >
>>> >
>>> > But actual flow to disk threshold from broker is as:
>>> >
>>> > * "broker.flowToDiskThreshold" : "858993459", ( with is 40% of
>>> > direct-mem(2g))*
>>> >
>>> > I know my message size is more than the threshold but i am trying to
>>> see
>>> > how log message was saying 168mb.
>>> >
>>>
>>> So the broker takes its overall flow to disk "quota" and then divides
>>> this
>>> up between virtual hosts, and for each virtual host divides up between
>>> the
>>> queues on the virtual host.  This allows for some fairness when multiple
>>> virtual hosts or multiple queues are actually representing different
>>> applications.  Individual queues thus may start flowing to disk even
>>> though
>>> the overall threshold has not yet been reached.
>>>
>>>
>>> >
>>> > So to make broker running i have enabled background recovery and it
>>> seems
>>> > working fine but  i am curious to know how broker dump back all the
>>> > messages from disk to memory does it dump all or does dump in batches?
>>> >
>>>
>>> So on recovery, and also when an individual message has flowed to disk,
>>> the
>>> broker simply reloads individual messages into memory when it needs them
>>> in
>>> an on-demand basis.
>>>
>>> Hope this helps,
>>> Rob
>>>
>>>
>>> >
>>> > Thanks,
>>> > Ram
>>> >
>>> > On Thu, Oct 13, 2016 at 11:29 AM, Rob Godfrey <rob.j.godf...@gmail.com
>>> >
>>> > wrote:
>>> >
>>> > > On 13 October 2016 at 17:36, rammohan ganapavarapu <
>>> > > rammohanga...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Lorenz,
>>> > > >
>>> > > > Thank you for the link, so no matter how much heap you have you
>>> will
>>> > hit
>>> > > > the hard limit at some point right?, i thought flow to disk will
>>> make
>>> > > > broker not to crash because of out of memory issue but looks like
>>> its
>>> > not
>>> > > > the case.
>>> > > >
>>> > > > In my environment we will have dynamic number of producers and
>>> > consumers
>>> > > so
>>> > > > its hard to pre measure how much heap we can allocate based on
>>> number
>>> > of
>>> > > > connection/sessions.
>>> > > >
>>> > > > Ram
>>> > > >
>>> > > >
>>> > > Yeah - currently there is always a hard limit based on the number of
>>> > "queue
>>> > > entries".  Ultimately there's a trade-off to be had with designing a
>>> > queue
>>> > > data structure which is high performing, vs. one which can be
>>> offloaded
>>> > > onto disk.  This gets even more complicated for queues which are not
>>> > strict
>>> > > FIFO (priority queues, LVQ, etc) or where consumers have selectors.
>>> > > Ultimately if you are storing millions of messages in your broker
>>> then
>>> > you
>>> > > are probably doing things wrong - we would expect people to enforce
>>> queue
>>> > > limits and flow control rather than expect the broker to have
>>> infinite
>>> > > capacity (and even off-loading to disk you will still run out of disk
>>> > space
>>> > > at some point).
>>> > >
>>> > > -- Rob
>>> > >
>>> > >
>>> > > >
>>> > > >
>>> > > > On Thu, Oct 13, 2016 at 9:05 AM, Lorenz Quack <
>>> quack.lor...@gmail.com>
>>> > > > wrote:
>>> > > >
>>> > > > > Hello Ram,
>>> > > > >
>>> > > > > may I refer you to the relevant section of the documentation [1].
>>> > > > > As explained there in more detail, the broker keeps a
>>> representation
>>> > of
>>> > > > > each message in heap even when flowing the message to disk.
>>> > > > > Therefore the amount of JVM heap memory puts a hard limit on the
>>> > number
>>> > > > of
>>> > > > > message the broker can hold.
>>> > > > >
>>> > > > > Kind Regards,
>>> > > > > Lorenz
>>> > > > >
>>> > > > > [1] https://qpid.apache.org/releases/qpid-java-6.0.4/java-broker
>>> > > > > /book/Java-Broker-Runtime-Memory.html
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > On 13/10/16 16:40, rammohan ganapavarapu wrote:
>>> > > > >
>>> > > > >> Hi,
>>> > > > >>
>>> > > > >> We are doing some load test using java broker 6.0.2 by stopping
>>> all
>>> > > > >> consumers, broker was crashed at 644359 messages. Even if i try
>>> to
>>> > > > restart
>>> > > > >> broker its crashing with the same oom error.
>>> > > > >>
>>> > > > >>   "persistentEnqueuedBytes" : 12731167222,
>>> > > > >>      "persistentEnqueuedMessages" : 644359,
>>> > > > >>      "queueDepthBytes" : 12731167222,
>>> > > > >>      "queueDepthMessages" : 644359,
>>> > > > >>      "totalDequeuedBytes" : 0,
>>> > > > >>      "totalDequeuedMessages" : 0,
>>> > > > >>      "totalEnqueuedBytes" : 12731167222,
>>> > > > >>      "totalEnqueuedMessages" : 644359,
>>> > > > >>
>>> > > > >> JVM settings of broker: -Xmx512m -XX:MaxDirectMemorySize=1536m
>>> > > > >>
>>> > > > >> "broker.flowToDiskThreshold" : "644245094",
>>> > > > >>
>>> > > > >> So theoretically broker should flow those messages to disk
>>> after the
>>> > > > >> threshold right then broker shouldn't have caused OOM exception
>>> > right?
>>> > > > do
>>> > > > >> i
>>> > > > >> have to do any other tuning?
>>> > > > >>
>>> > > > >> Thanks,
>>> > > > >> Ram
>>> > > > >>
>>> > > > >>
>>> > > > >
>>> > > > > ------------------------------------------------------------
>>> > ---------
>>> > > > > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
>>> > > > > For additional commands, e-mail: users-h...@qpid.apache.org
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to