On Fri, 26 Aug 2011 18:42:13 +0200
Esteve Fernandez <esteve.fernan...@gmail.com> wrote:

> Hey,
> 
> 2011/8/26 bartek <bar...@gorny.edu.pl>:
> > blocks, the channel disappears from rabbitmq, and rabbit's sasl log
> > shows a crash report, the most important line being:
> >
> >    exception exit: {amqp_error,frame_error,
> >                                "cannot decode
> >    
> > <<0,0,16,98,114,111,97,100,99,97,115,116,95,108,105,115,116,101,110,0,0,0,0,0,0>>",
> >    'basic.consume'}
> 
> This usually happens when there's a Deferred that's not yield
> elsewhere in the code. The way AMQP works is that all commands must be
> sent in order, that is, you can't send a command if you haven't waited
> for the previous one to come back (if it produces a response).

Ah, so in other words rabbit's channel is in the wrong state so it does
not accept the command I send?

> 
> Is this the only place where you're using AMQP? Is there anywhere else
> in the code where it's used?

Not really - the code is trivial, mostly copied from the test suite.
Maybe I'm running it in the wrong way? See attached - all I do is start
and listen...

B.


> 
> Thanks for using txAMQP!
> 
> _______________________________________________
> Mailing list: https://launchpad.net/~txamqp-user
> Post to     : txamqp-user@lists.launchpad.net
> Unsubscribe : https://launchpad.net/~txamqp-user
> More help   : https://help.launchpad.net/ListHelp



-- 
Mam wiele powodów żeby ciągle iść do przodu trzy z nich witam od progu
gdy wracam do domu... (Abradab/O.S.T.R.)
'''
Created on Aug 24, 2011

@author: bartek
'''

import os
import sys
import time
import warnings

from txamqp.content import Content
import txamqp.spec

from txamqp.protocol import AMQChannel, AMQClient, TwistedDelegate

from twisted.internet import error, protocol, reactor
from twisted.trial import unittest
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue, DeferredQueue, DeferredLock
from twisted.python import failure
from txamqp.queue import Empty


RABBITMQ = "RABBITMQ"
OPENAMQ = "OPENAMQ"
QPID = "QPID"

class supportedBrokers(object):

    def __init__(self, *supporterBrokers):
        self.supporterBrokers = supporterBrokers

    def __call__(self, f):
        if _get_broker() not in self.supporterBrokers:
            f.skip = "Not supported for this broker."
        return f


def _get_broker():
    return os.environ.get("TXAMQP_BROKER")


USERNAME='guest'
PASSWORD='guest'
VHOST='/'
HEARTBEAT = 0


class Base(object):
    """
    copied from txamqp test suite, nearly in extenso
    """
    clientClass = AMQClient
    heartbeat = HEARTBEAT
    instance_id = 'base'
    id = '123'

    def __init__(self, id=None, *args, **kwargs):
        self.id = id or self.id
        self.host = 'localhost'
        self.port = 5672
        self.spec = '/home/bartek/Downloads/python-txamqp-0.3/src/specs/qpid/amqp.0-8.xml'
        self.user = USERNAME
        self.password = PASSWORD
        self.vhost = VHOST
        self.queues = []
        self.exchanges = []
        self.connectors = []

    @inlineCallbacks
    def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None,
            heartbeat=None, clientClass=None):
        host = host or self.host
        port = port or self.port
        spec = spec or self.spec
        user = user or self.user
        password = password or self.password
        vhost = vhost or self.vhost
        heartbeat = heartbeat or self.heartbeat
        clientClass = clientClass or self.clientClass

        delegate = TwistedDelegate()
        onConn = Deferred()
        p = clientClass(delegate, vhost, txamqp.spec.load(spec), heartbeat=heartbeat)
        f = protocol._InstanceFactory(reactor, p, onConn)
        c = reactor.connectTCP(host, port, f)
        def errb(thefailure):
            thefailure.trap(error.ConnectionRefusedError)
            print "failed to connect to host: %s, port: %s; These tests are designed to run against a running instance" \
                  " of the %s AMQP broker on the given host and port.  failure: %r" % (host, port, self.broker, thefailure,)
            thefailure.raiseException()
        onConn.addErrback(errb)

        self.connectors.append(c)
        client = yield onConn

        yield client.authenticate(user, password)
        returnValue(client)
        
    @inlineCallbacks
    def start(self, v=None):
        print 'starting'
        # TODO reconnecting, handle calls in the meantime
        try:
            self.client = yield self.connect()
        except txamqp.client.Closed, le:
            le.args = tuple(("Unable to connect to AMQP broker in order to run tests (perhaps due to auth failure?). " \
                "The tests assume that an instance of the %s AMQP broker is already set up and that this test script " \
                "can connect to it and use it as user '%s', password '%s', vhost '%s'." % (_get_broker(),
                    USERNAME, PASSWORD, VHOST),) + le.args)
            raise

        print 'setting channel'
        self.channel = yield self.client.channel(1)
        print 'channel set'
        yield self.channel.channel_open()
        print 'channel open'
        
    @inlineCallbacks
    def stop(self, v=None):
        for connector in self.connectors:
            yield connector.disconnect()
        
    @inlineCallbacks
    def exchange_declare(self, channel=None, ticket=0, exchange='',
                         type='', passive=False, durable=False,
                         auto_delete=False, internal=False, nowait=False,
                         arguments={}):
        print 'declaring'
        channel = channel or self.channel
        reply = yield channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
        self.exchanges.append((channel,exchange))
        returnValue(reply)

    @inlineCallbacks
    def queue_declare(self, channel=None, *args, **keys):
        channel = channel or self.channel
        reply = yield channel.queue_declare(*args, **keys)
        self.queues.append((channel, reply.queue))
        returnValue(reply)


class MyBase(Base): 
       
    @inlineCallbacks
    def broadcast(self, v=None, message=None):
        exid = '%s.broadcast' % self.instance_id
        yield self.exchange_declare(0, exchange=exid, type='fanout')
        qid = '%s.broadcast.%s' % (self.instance_id, self.id)
        yield self.queue_declare(queue=qid)
        yield self.channel.queue_bind(queue=qid, exchange=exid)
        for i in range(1, 11):
            print "sending", i
            msg=Content("Message %d" % i)
            self.channel.basic_publish(content=msg, exchange=exid)
    
    @inlineCallbacks
    def listen(self, v=None, exid=None):
        # deklarujemy na wypadek gdyby nie bylo
        print 1
        exid = '%s.broadcast' % self.instance_id
        yield self.exchange_declare(0, exchange=exid, type='fanout')
        print 2
        qid = '%s_broadcast_listen_%s' % (self.instance_id, self.id)
        yield self.queue_declare(queue=qid)
        print 2.5
        yield self.channel.queue_bind(queue=qid, exchange=exid)
        print 3
        subscription = yield self.channel.basic_consume(queue=qid)
        print 4
        queue = yield self.client.queue(consumer_tag=subscription.consumer_tag)
        print 5
        while True:
            print 'waiting'
            msg = yield queue.get()
            print '[FANOUT] Received: ' + msg.content.body + '...'



if __name__ == '__main__':
    a = sys.argv[1]
    if a == 'send':
        b = MyBase()
        d = Deferred()
        d.addCallback(b.start)
        d.addCallback(b.broadcast)
    if a == 'listen':
        b = MyBase(sys.argv[2])
        d = Deferred()
        d.addCallback(b.start)
        d.addCallback(b.listen)
    d.callback(None)
    reactor.run()    
        
        
        
#        exid = '%s.broadcast' % self.instance_id
#        yield self.exchange_declare(0, exchange=exid, type='fanout')
#        qid = '%s.broadcast_listen.%s' % (self.instance_id, self.id)
#        yield self.channel.queue_declare(queue=qid)
#        yield self.channel.queue_bind(queue=qid, exchange=exid)
#        import pdb
#        pdb.set_trace()
#        while True:
#            print 'waiting'
#            msg = yield self.channel.queue.get()
#            print '[FANOUT] Received: ' + msg.content.body + '...'
        
#    
#    def b(self):
#        def _got(msg):
#            print '[FANOUT] Received: ' + msg.content.body + '...'
#            return msg
#        def _err(f):
#            print '[ERR] Received: ', f
#        d = self.channel.basic_consume(queue=qid, no_ack=True)
#        d.addCallback(_got)
#        d.addErrback(_err)
#        yield d
#        
#    
#    def a(self):
#        done = False
#        def _cancel(failure):
#            print 'cancel'
#            done = True
#            return self.channel.basic_cancel()
#        def _got(msg):
#            print '[FANOUT] Received: ' + msg.content.body + '...'
#            return msg
#        while not done:
#            print 'waiting'
##            d = self.channel.basic_get(no_ack=True)
#            d = self.channel.queue.get()
#            d.addCallback(_got)
#            d.addErrback(_cancel)
#            msg = yield d
##            msg = yield self.channel.basic_consume(queue=qid)
#            print '[FANOUT] Received: ' + msg.content.body + '...'        
        
        
        
        
        
        
        
        
        
_______________________________________________
Mailing list: https://launchpad.net/~txamqp-user
Post to     : txamqp-user@lists.launchpad.net
Unsubscribe : https://launchpad.net/~txamqp-user
More help   : https://help.launchpad.net/ListHelp

Reply via email to