Hi, I'm trying to write a simple gateway to receive messages using a specific protocol and publish/store them using txredis. I wrote a small example that seems to work. But the small test I wrote fails:
$ trial gateway/test gateway.test.test_example GatewayServiceTestCase test_messageReceived ... [ERROR] =============================================================================== [ERROR] Traceback (most recent call last): Failure: twisted.internet.error.ConnectionDone: Connection was closed cleanly. gateway.test.test_example.GatewayServiceTestCase.test_messageReceived ------------------------------------------------------------------------------- Ran 1 tests in 0.007s As I understand, the connection to the redis server is lost during the test. I actually managed to get the test to pass by adding some inlineCallbacks decorator to my messageReceived and lineReceived methods. But I don't really understand why that would be needed. Could someone explain what is happening? Both version of the code can be found here: https://gist.github.com/beenje/6150400 (revision 1 with the problem and revision 2 with the inlineCallbacks) Below is the original version with the problem: Thanks Benjamin example.py ---------------------------------------------------------------------------------- import json import time from twisted.internet import defer from twisted.internet.protocol import ServerFactory from twisted.protocols.basic import LineReceiver from twisted.python import log class BasicProtocol(LineReceiver): def lineReceived(self, line): self.messageReceived(line) def messageReceived(self, message): try: self.factory.messageReceived(message) except AttributeError: pass class BasicGatewayFactory(ServerFactory): protocol = BasicProtocol def __init__(self, service, channel): self.service = service self.channel = channel def messageReceived(self, message): self.service.publish(self.channel, message) class RedisPublishService(object): def __init__(self, factory): """ @param factory: redis client factory """ self.factory = factory @defer.inlineCallbacks def publish(self, channel, message): log.msg("Publish message {} on {}".format(message, channel)) yield self.factory.client.publish(channel, message) timestamp = int(time.time() * 1000) # Include the timestamp in the value to allow # duplicate message value = json.dumps({"timestamp": timestamp, "message": message}) log.msg("Store message in {} sorted set with score {}".format( channel, timestamp)) # Set the timestamp as score to easily fetch the values within a # time period using zrangebyscore yield self.factory.client.zadd(channel, timestamp, value) if __name__ == '__main__': import sys from twisted.internet import reactor from txredis.client import RedisClientFactory log.startLogging(sys.stdout) redis_factory = RedisClientFactory() reactor.connectTCP('localhost', 6379, redis_factory) redis_pub_service = RedisPublishService(redis_factory) gw_factory = BasicGatewayFactory(redis_pub_service, "test") reactor.listenTCP(8000, gw_factory) reactor.run() ---------------------------------------------------------------------------------- test_example.py ---------------------------------------------------------------------------------- from twisted.internet import reactor, defer, protocol from twisted.python import log from twisted.test import proto_helpers from twisted.trial.unittest import TestCase from txredis.client import RedisSubscriber, RedisClientFactory from txredis.testing import REDIS_HOST, REDIS_PORT from gateway.example import BasicGatewayFactory, RedisPublishService class GatewayServiceTestCase(TestCase): @defer.inlineCallbacks def setUp(self): self.redis_factory = RedisClientFactory() reactor.connectTCP(REDIS_HOST, REDIS_PORT, self.redis_factory) yield self.redis_factory.deferred self.redis_pub_service = RedisPublishService(self.redis_factory) self.factory = BasicGatewayFactory(self.redis_pub_service, "test") self.server = self.factory.buildProtocol(None) self.transport = proto_helpers.StringTransportWithDisconnection() self.transport.protocol = self.server self.server.makeConnection(self.transport) class MySubscriber(RedisSubscriber): def __init__(self, *args, **kwargs): RedisSubscriber.__init__(self, *args, **kwargs) self.msg_channel = None self.msg_message = None self.msg_received = defer.Deferred() def messageReceived(self, channel, message): log.msg("Message received!") self.msg_channel = channel self.msg_message = message self.msg_received.callback(None) self.msg_received = defer.Deferred() clientCreator = protocol.ClientCreator(reactor, MySubscriber) self.subscriber = yield clientCreator.connectTCP(REDIS_HOST, REDIS_PORT) yield self.subscriber.subscribe("test") def tearDown(self): self.subscriber.transport.loseConnection() self.redis_factory.continueTrying = 0 self.redis_factory.stopTrying() if self.redis_factory.client: self.redis_factory.client.setTimeout(None) self.redis_factory.client.transport.loseConnection() self.transport.loseConnection() @defer.inlineCallbacks def test_messageReceived(self): cb = self.subscriber.msg_received self.server.dataReceived('HELLO1\r\n') yield cb self.assertEqual(self.subscriber.msg_channel, "test") self.assertEqual(self.subscriber.msg_message, "HELLO1") _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python