Ah, looks like this is fixed on the trunk already.

Though looking at that code it fails to throw an error if
all sessions have been used and simply silently goes on.

- Ron


Friday, March 12, 2010, 2:28:28 PM, you wrote:

> Okay, I've found the bug. It's a wrap around issue.

> void ConnectionImpl::addSession(const
> boost::shared_ptr<SessionImpl>& session, uint16_t channel)
> {
>     Mutex::ScopedLock l(lock);
>     session->setChannel(channel ? channel : nextChannel++);
>     boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
>     boost::shared_ptr<SessionImpl> ss = s.lock();
>         if (ss) {
>                 throw SessionBusyException(QPID_MSG("Channel " <<
> ss->getChannel() << " attached to " << ss->getId()));
>         }
>     s = session;
> }

> At the time of the error this is the value of some of the variables:

>   channel = 0
>   nextChannel = 3
>   session->getChannel() = 2
>   sessions  [6]((0,{px=0x083c8508 pn={...} }),(1,{px=0x0839b7a8
> pn={...} }),(2,{px=0x033f3bb0 pn={...} }),(65533,{px=0x083c2008
> pn={...} }),(65534,{px=0x08394c00 pn={...} }),(65535,{px=0x083cd368 pn={...} 
> }))


> Note  how  channel 2 is actually already in use! That should not be as
> our  nextChannel integer should point at a free channel. Sadly that is
> try until nextChannel wraps around, which will happen eventually, just
> in  my  program  is  ends  up  being  sooner  rather  than later. I've
> rewritten part addSession.

> void ConnectionImpl::addSession(const
> boost::shared_ptr<SessionImpl>& session, uint16_t channel)
> {
>     Mutex::ScopedLock l(lock);

>         if ( channel == 0 ) 
>         {
>                 for (uint16_t c = nextChannel; c != (nextChannel-1); c++ )
>                 {
>                         boost::weak_ptr<SessionImpl>& s = sessions[c];
>                         boost::shared_ptr<SessionImpl> ss = s.lock();
>                         if ( !ss  ) 
>                         {
>                                 session->setChannel(c);
>                                 nextChannel = c + 1;
>                                 s = session;
>                                 return;
>                         }
>                 }                               

>                 throw SessionBusyException(QPID_MSG("All channels in use"));

>         } 
>         else 
>         {
>                 session->setChannel(channel);
>                 boost::weak_ptr<SessionImpl>& s = 
> sessions[session->getChannel()];
>                 boost::shared_ptr<SessionImpl> ss = s.lock();
>                 if (ss) throw
> SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to 
> " << ss->getId()));
>                 s = session;
>         }
> }

> I'll submit a bug report as well.

> - Ron



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to