Hello Ram,

This seems curious.
Yes, the idea behind flow to disk is to prevent the broker from running out of direct memory. The broker does keep a certain representation of the message in memory but that should affect heap and not direct memory.

I currently do not understand what is happening here so I raised a JIRA [1].

Could you provide some more information about your test case so I can try to reproduce it on my end?
What is the type of your virtualhost (Derby, BDB, ...)?
How large are your messages? Do they vary in size or all the same size?
How many connections/sessions/producers/consumers are connected to the broker?
Are there any consumers active while you are testing?
Do you use transactions?
Are the messages persistent or transient?

Kind regards,
Lorenz

[1] https://issues.apache.org/jira/browse/QPID-7461


On 14/10/16 19:14, rammohan ganapavarapu wrote:
Hi,

I am confused with flow to disk context, when direct memory reaches flow to
disk threshold, broker directly write to disk or it keep in both memory and
disk? i was in the impression that flow to disk threshold to free up direct
memory so that broker wont crash, isn't it?

So i have 1.5gb direct memory and here is my flow to disk threshodl

"broker.flowToDiskThreshold":"644245094"  (40% as default)

I am pushing messages and after 40% of direct memory messages are writing
to disk as you can see disk space is going up but my question is when its
writing to disk shouldn't it free up direct memory? but i see direct memory
usage is also going up, am i missing any thing here?


broker1 | success | rc=0 >>
/data   50G  754M   46G   2% /ebs
Fri Oct 14 17:59:25 UTC 2016
   "maximumDirectMemorySize" : 1610612736,
     "usedDirectMemorySize" : 840089280,

broker1 | success | rc=0 >>
/data   50G  761M   46G   2% /ebs
Fri Oct 14 17:59:27 UTC 2016
   "maximumDirectMemorySize" : 1610612736,
     "usedDirectMemorySize" : 843497152,

.
.
.
/data   50G  1.3G   46G   3% /ebs
Fri Oct 14 18:09:08 UTC 2016
   "maximumDirectMemorySize" : 1610612736,
     "usedDirectMemorySize" : 889035136,


Please help me understand this!

Thanks,
Ram



On Fri, Oct 14, 2016 at 9:22 AM, rammohan ganapavarapu <
rammohanga...@gmail.com> wrote:

So i ran the test few more times and it is happening every time, i was
monitoring direct memory usage and looks like it ran out of direct memory.

   "maximumDirectMemorySize" : 2415919104,
     "usedDirectMemorySize" : 2414720896,

Any thoughts guys?

Ram

On Thu, Oct 13, 2016 at 4:37 PM, rammohan ganapavarapu <
rammohanga...@gmail.com> wrote:

Guys,

Not sure what i am doing wrong, i have set heap to 1gb and direct mem to
2gb after ~150k msgs queuedepth  in the queue i am getting bellow error and
broker is getting killed. Any suggestions?

2016-10-13 23:27:41,894 ERROR [IO-/10.16.1.34:46096] (o.a.q.s.Main) -
Uncaught exception, shutting down.
java.lang.OutOfMemoryError: Direct buffer memory
         at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_75]
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
~[na:1.7.0_75]
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
~[na:1.7.0_75]

2016-10-13 23:27:41,894 ERROR [IO-/10.16.1.34:46096] (o.a.q.s.Main) -
Uncaught exception, shutting down.
java.lang.OutOfMemoryError: Direct buffer memory
         at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_75]
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
~[na:1.7.0_75]
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
~[na:1.7.0_75]
         at 
org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:469)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:482)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.protocol.v0_10.ServerEncoder.init(ServerEncoder.java:57)
~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
method(ServerDisassembler.java:196) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
control(ServerDisassembler.java:185) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
control(ServerDisassembler.java:57) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Method.delegate(Method.java:159)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerDisassembler.
send(ServerDisassembler.java:79) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Connection.send(Connection.java:415)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.protocol.v0_10.ServerConnection.send(ServerConnection.java:497)
~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Session.send(Session.java:588)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Session.invoke(Session.java:804)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Session.invoke(Session.java:613)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.SessionInvoker.sessionCompleted(SessionInvoker.java:65)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Session.flushProcessed(Session.java:514)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.
command(ServerSessionDelegate.java:119) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.
command(ServerSessionDelegate.java:87) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Method.delegate(Method.java:155)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Session.received(Session.java:582)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Connection.dispatch(Connection.java:447)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:65)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:41)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.MethodDelegate.executionSync(MethodDelegate.java:104)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.ExecutionSync.dispatch(ExecutionSync.java:82)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.ConnectionDelegate.command(ConnectionDelegate.java:55)
~[qpid-common-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.transport.ConnectionDelegate.command(ConnectionDelegate.java:41)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Method.delegate(Method.java:155)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.transport.Connection.received(Connection.java:400)
~[qpid-common-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerConnection.
access$001(ServerConnection.java:72) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.protocol.v0_10.ServerConnection$2.run(ServerConnection.java:277)
~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.protocol.v0_10.ServerConnection$2.run(ServerConnection.java:273)
~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
         at java.security.AccessController.doPrivileged(Native Method)
~[na:1.7.0_75]
         at org.apache.qpid.server.protocol.v0_10.ServerConnection.
received(ServerConnection.java:272) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.protocol.v0_10.ServerAssembler.emit(ServerAssembler.java:122)
~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerAssembler.
assemble(ServerAssembler.java:211) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.protocol.v0_10.ServerAssembler.frame(ServerAssembler.java:151)
~[qpid-broker-plugins-amqp-0-10-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerAssembler.
received(ServerAssembler.java:79) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerInputHandler.
parse(ServerInputHandler.java:175) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.ServerInputHandler.
received(ServerInputHandler.java:82) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.protocol.v0_10.AMQPConnection_0_10$3.
run(AMQPConnection_0_10.java:156) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at java.security.AccessController.doPrivileged(Native Method)
~[na:1.7.0_75]
         at org.apache.qpid.server.protocol.v0_10.AMQPConnection_0_10.
received(AMQPConnection_0_10.java:148) ~[qpid-broker-plugins-amqp-0-1
0-protocol-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.transport.MultiVersionProtocolEngine.
received(MultiVersionProtocolEngine.java:144)
~[qpid-broker-core-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.transport.NonBlockingConnection.proce
ssAmqpData(NonBlockingConnection.java:609) ~[qpid-broker-core-6.0.2.jar:6
.0.2]
         at org.apache.qpid.server.transport.NonBlockingConnectionPlainD
elegate.processData(NonBlockingConnectionPlainDelegate.java:58)
~[qpid-broker-core-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.transport.NonBlockingConnection.
doRead(NonBlockingConnection.java:503) ~[qpid-broker-core-6.0.2.jar:6
.0.2]
         at org.apache.qpid.server.transport.NonBlockingConnection.
doWork(NonBlockingConnection.java:282) ~[qpid-broker-core-6.0.2.jar:6
.0.2]
         at org.apache.qpid.server.transport.NetworkConnectionScheduler.
processConnection(NetworkConnectionScheduler.java:124)
~[qpid-broker-core-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.transport.SelectorThread$ConnectionPr
ocessor.processConnection(SelectorThread.java:504)
~[qpid-broker-core-6.0.2.jar:6.0.2]
         at org.apache.qpid.server.transport.SelectorThread$SelectionTas
k.performSelect(SelectorThread.java:337) ~[qpid-broker-core-6.0.2.jar:6
.0.2]
         at 
org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:87)
~[qpid-broker-core-6.0.2.jar:6.0.2]
         at 
org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:462)
~[qpid-broker-core-6.0.2.jar:6.0.2]
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
~[na:1.7.0_75]
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
~[na:1.7.0_75]
         at java.lang.Thread.run(Thread.java:745) ~[na:1.7.0_75]


Ram

On Thu, Oct 13, 2016 at 1:45 PM, Rob Godfrey <rob.j.godf...@gmail.com>
wrote:

On 13 October 2016 at 19:54, rammohan ganapavarapu <
rammohanga...@gmail.com>
wrote:

Rob,

Understood, we are doing negative testing like what will happened to
broker
when all the consumers are down but producers are pumping messages, so
i
was the in the impression that flow to disk threshold will avoid
broker go
bad because of OOM. So i have bumped up the heap and direct mem
setting of
broker and try to restart  but it was complaining with bellow error.



*2016-10-13 18:28:46,157 INFO  [Housekeeping[default]]
(q.m.q.flow_to_disk_active) - [Housekeeping[default]]
[vh(/default)/qu(ax-q-mxgroup001)] QUE-1014 : Message flow to disk
active
:  Message memory use 13124325 kB (13gb) exceeds threshold 168659 kB
(168mb)*


But actual flow to disk threshold from broker is as:

* "broker.flowToDiskThreshold" : "858993459", ( with is 40% of
direct-mem(2g))*

I know my message size is more than the threshold but i am trying to
see
how log message was saying 168mb.

So the broker takes its overall flow to disk "quota" and then divides
this
up between virtual hosts, and for each virtual host divides up between
the
queues on the virtual host.  This allows for some fairness when multiple
virtual hosts or multiple queues are actually representing different
applications.  Individual queues thus may start flowing to disk even
though
the overall threshold has not yet been reached.


So to make broker running i have enabled background recovery and it
seems
working fine but  i am curious to know how broker dump back all the
messages from disk to memory does it dump all or does dump in batches?

So on recovery, and also when an individual message has flowed to disk,
the
broker simply reloads individual messages into memory when it needs them
in
an on-demand basis.

Hope this helps,
Rob


Thanks,
Ram

On Thu, Oct 13, 2016 at 11:29 AM, Rob Godfrey <rob.j.godf...@gmail.com

wrote:

On 13 October 2016 at 17:36, rammohan ganapavarapu <
rammohanga...@gmail.com>
wrote:

Lorenz,

Thank you for the link, so no matter how much heap you have you
will
hit
the hard limit at some point right?, i thought flow to disk will
make
broker not to crash because of out of memory issue but looks like
its
not
the case.

In my environment we will have dynamic number of producers and
consumers
so
its hard to pre measure how much heap we can allocate based on
number
of
connection/sessions.

Ram


Yeah - currently there is always a hard limit based on the number of
"queue
entries".  Ultimately there's a trade-off to be had with designing a
queue
data structure which is high performing, vs. one which can be
offloaded
onto disk.  This gets even more complicated for queues which are not
strict
FIFO (priority queues, LVQ, etc) or where consumers have selectors.
Ultimately if you are storing millions of messages in your broker
then
you
are probably doing things wrong - we would expect people to enforce
queue
limits and flow control rather than expect the broker to have
infinite
capacity (and even off-loading to disk you will still run out of disk
space
at some point).

-- Rob



On Thu, Oct 13, 2016 at 9:05 AM, Lorenz Quack <
quack.lor...@gmail.com>
wrote:

Hello Ram,

may I refer you to the relevant section of the documentation [1].
As explained there in more detail, the broker keeps a
representation
of
each message in heap even when flowing the message to disk.
Therefore the amount of JVM heap memory puts a hard limit on the
number
of
message the broker can hold.

Kind Regards,
Lorenz

[1] https://qpid.apache.org/releases/qpid-java-6.0.4/java-broker
/book/Java-Broker-Runtime-Memory.html



On 13/10/16 16:40, rammohan ganapavarapu wrote:

Hi,

We are doing some load test using java broker 6.0.2 by stopping
all
consumers, broker was crashed at 644359 messages. Even if i try
to
restart
broker its crashing with the same oom error.

   "persistentEnqueuedBytes" : 12731167222,
      "persistentEnqueuedMessages" : 644359,
      "queueDepthBytes" : 12731167222,
      "queueDepthMessages" : 644359,
      "totalDequeuedBytes" : 0,
      "totalDequeuedMessages" : 0,
      "totalEnqueuedBytes" : 12731167222,
      "totalEnqueuedMessages" : 644359,

JVM settings of broker: -Xmx512m -XX:MaxDirectMemorySize=1536m

"broker.flowToDiskThreshold" : "644245094",

So theoretically broker should flow those messages to disk
after the
threshold right then broker shouldn't have caused OOM exception
right?
do
i
have to do any other tuning?

Thanks,
Ram


------------------------------------------------------------
---------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org





---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to