On 09/26/2011 07:44 PM, joseluis wrote:
I wrote a program who sends 1000 messages per second on   testing address
continuously


The  program below, read these messages, and after 10000 messages, create a
new session and receptor with same configuration as before


The receptor and session get out of scope before creating the new one, but
if I don't call session.close(); the memory on broker increase quickly

If I call session.close();  before creating the new session instance, it
doesn't happen


The topic was created with...

qpid-config add exchange topic testing


Looks like if  reception.close();  or reception out of scope is not enough
to remove the "suscription" on broker side and the broker save all the
messages for a deleted  reception and session objects

Good point. Though the Receiver::close() cancels the subscription, it does not actually delete the subscription queue or unbind it correctly. It is marked as auto-delete and exclusive (by default) and thus will be deleted by the broker only when the session ends.

If so, it will be necessary to use just a reception per session in order to
let call session.close(); when a reception is not required by the program
anymore

Yes, that is a bug (I've raised a JIRA: https://issues.apache.org/jira/browse/QPID-3508 and will get a fix in shortly). As a workaround you could explicitly control the flags on the queue. E.g.

address + "/#; {assert always, node: {type: topic}, link: {x-declare:{exclusive:False, auto-delete:True}}}"

#include<qpid/messaging/Connection.h>
#include<qpid/messaging/Message.h>
#include<qpid/messaging/Receiver.h>
#include<qpid/messaging/Sender.h>
#include<qpid/messaging/Session.h>

#include<iostream>

using namespace qpid::messaging;

int main(int /*argc*/, char** /*argv*/) {
     std::string broker = "localhost:5672";
     std::string address = "testing";

     Connection connection(broker, "");
     try {
         connection.open();


         {
                 Session session = connection.createSession();
                 std::string  receiver_config = address  + "/#;
{assert:always, node:{type:topic, durable:False},
link:{reliability:unreliable, durable:False}  }";
                 Receiver receiver = session.createReceiver(receiver_config);
                 receiver.setCapacity(10);

                 for(int counter=0; counter<10000; ++counter)
                     Message message = receiver.fetch(Duration::SECOND * 1);

                 //session.close();      //  this line commented, generates a
fast increase of memory on broker
         }

         std::cout<<  "other subscription  receiver gets out of scope"<<
std::endl;

         {
                 Session session = connection.createSession();
                 std::string  receiver_config = address  + "/#;
{assert:always, node:{type:topic, durable:False},
link:{reliability:unreliable, durable:False}  }";
                 Receiver receiver = session.createReceiver(receiver_config);


                 for(int counter=0; counter<60000; ++counter)
                     Message message = receiver.fetch(Duration::SECOND * 1);
         }


         connection.close();
         return 0;
     } catch(const std::exception&  error) {
         std::cerr<<  error.what()<<  std::endl;
         connection.close();
         return 1;
     }
}

--
View this message in context: 
http://apache-qpid-users.2158936.n2.nabble.com/memory-consumption-on-broker-not-closing-session-tp6833011p6833011.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to