Pavel Moravec created QPID-5214:
-----------------------------------

             Summary: [C++ broker] Memory leak in legacystore when raising 
RHM_IORES_ENQCAPTHRESH
                 Key: QPID-5214
                 URL: https://issues.apache.org/jira/browse/QPID-5214
             Project: Qpid
          Issue Type: Bug
          Components: C++ Broker
    Affects Versions: 0.24
            Reporter: Pavel Moravec
            Priority: Minor


There is a memory leak when legacystore raises RHM_IORES_ENQCAPTHRESH: "Enqueue 
capacity threshold exceeded on queue ..". For reproducer, let try to send 
durable messages to a tiny journal queue in a loop.

Valgrind showed me:

==632== 2,288 (208 direct, 2,080 indirect) bytes in 2 blocks are definitely 
lost in loss record 115 of 116
==632==    at 0x4A075BC: operator new(unsigned long) (vg_replace_malloc.c:298)
==632==    by 0x60D76AB: 
mrg::msgstore::MessageStoreImpl::store(qpid::broker::PersistableQueue const*, 
mrg::msgstore::TxnCtxt*, boost::intrusive_ptr<qpid::broker::PersistableMessage> 
const&, bool) (in /data_xfs/qpid-trunk/cpp/BLD/src/legacystore.so)
==632==    by 0x60D7165: 
mrg::msgstore::MessageStoreImpl::enqueue(qpid::broker::TransactionContext*, 
boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, 
qpid::broker::PersistableQueue const&) (in 
/data_xfs/qpid-trunk/cpp/BLD/src/legacystore.so)
==632==    by 0x5023568: 
qpid::broker::MessageStoreModule::enqueue(qpid::broker::TransactionContext*, 
boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, 
qpid::broker::PersistableQueue const&) (in 
/data_xfs/qpid-trunk/cpp/BLD/src/libqpidbroker.so.2.0.0)
==632==    by 0x4F9BAC8: 
qpid::broker::Queue::enqueue(qpid::broker::TransactionContext*, 
qpid::broker::Message&) (in 
/data_xfs/qpid-trunk/cpp/BLD/src/libqpidbroker.so.2.0.0)

Some further debugging showed the line with "new" call is:

void MessageStoreImpl::store(..
..
        if (queue) {
            boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
            dtokp->addRef();
..


I tried to fix the leak, but I see nothing wrong in code that could trigger it. 
As:
1) dtokp is a local variable declared there, while its content is not copied or 
referenced anywhere later on
2) even catching StoreException and explicitly calling "dtokp->reset(); dtokp = 
boost::intrusive_ptr<DataTokenImpl>();" does not prevent the mem.leak

What exactly is executed at the time RHM_IORES_ENQCAPTHRESH to be raised within 
MessageStoreImpl::store call:

1) ./lib/MessageStoreImpl.cpp:
MessageStoreImpl::store

            boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
            dtokp->addRef();
            dtokp->setSourceMessage(message);
            dtokp->set_external_rid(true);
            dtokp->set_rid(message->getPersistenceId()); // set the messageID 
into the Journal header (record-id)

            JournalImpl* jc = 
static_cast<JournalImpl*>(queue->getExternalQueueStore());
            if (txn->getXid().empty()) {
                if (message->isContentReleased()) {
                    jc->enqueue_extern_data_record(size, dtokp.get(), 
!message->isPersistent());
                } else {
                    jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), 
!message->isPersistent());
                }


2) enqueue_data_record called from:
./lib/JournalImpl.cpp
JournalImpl::enqueue_data_record

JournalImpl::enqueue_data_record(const void* const data_buff, const size_t 
tot_data_len,
        const size_t this_data_len, data_tok* dtokp, const bool transient)
{
    handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, 
this_data_len, dtokp, transient));


3) nested enqueue_data_record called from:
./lib/jrnl/jcntl.cpp:
jcntl::enqueue_data_record(const void* const data_buff, const std::size_t 
tot_data_len,
        const std::size_t this_data_len, data_tok* dtokp, const bool transient)

        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, 
this_data_len, dtokp, 0, 0, transient, false), r,
                        dtokp)) ;


4) _wmgr.enqueue called from:
./lib/jrnl/wmgr.cpp:
wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
        const std::size_t this_data_len, data_tok* dtokp, const void* const 
xid_ptr,
        const std::size_t xid_len, const bool transient, const bool external)

    iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, 
external);
    if (res != RHM_IORES_SUCCESS)
        return res;


5) pre_write_check called from ./lib/jrnl/wmgr.cpp as well:

wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
        const std::size_t xidsize, const std::size_t dsize, const bool external
        ) const

    if (!_wrfc.is_wr_reset())
    {
        if (!_wrfc.wr_reset())
            return RHM_IORES_FULL;
    }

    // Check status of current page is ok for writing
    if (_page_cb_arr[_pg_index]._state != IN_USE)
    {
        if (_page_cb_arr[_pg_index]._state == UNUSED)
            _page_cb_arr[_pg_index]._state = IN_USE;
    }

    switch (op)
    {
        case WMGR_ENQUEUE:
            {
                // Check for enqueue reaching cutoff threshold
                u_int32_t size_dblks = 
jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
                        external));
                if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + 
size_dblks))
                    return RHM_IORES_ENQCAPTHRESH;

6) return to 4, there return to 3, and execute: handle_aio_wait:
./lib/jrnl/jcntl.cpp:
jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)

(return false)

7) return to 2, there return value used in:
./lib/JournalImpl.cpp:
JournalImpl::handleIoResult(const iores r):
    writeActivityFlag = true;
    switch (r)
    {
        case mrg::journal::RHM_IORES_SUCCESS:
            return;
        case mrg::journal::RHM_IORES_ENQCAPTHRESH:
            {
                std::ostringstream oss;
                oss << "Enqueue capacity threshold exceeded on queue \"" << 
_jid << "\".";
                log(LOG_WARN, oss.str());
                if (_agent != 0)
                    
_agent->raiseEvent(qmf::com::redhat::rhm::store::EventEnqThresholdExceeded(_jid,
 "Journal enqueue capacity threshold exceeded"),
                                       
qpid::management::ManagementAgent::SEV_WARN);
                THROW_STORE_FULL_EXCEPTION(oss.str());
            }


Simply, no data_tok* object used is copied/referenced/whatever in a manner it 
could prevent freeing the memory.

Anyway, the memory leak is there, the trivial reproducer causes the broker to 
consume more and more memory (checked by "ps" and "qpid-stat -m" commands 
outputs).




--
This message was sent by Atlassian JIRA
(v6.1#6144)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to