Hi,

This kind of relates to my last problem. With my new file-related test
setup, I often get WindowsErrors when removing files, because they are still
in use, because for some reason the file objects refuse to be closed or
deleted, and are still wandering around when teardown_method is called.
Again, I'm not sure whether this is something I'm doing wrong or a bug, but
I've attached a test (simplified, with attempts at debugging), which fails
with the error every second run (every other run the file exists at the
start) on Python 2.5.2, py 0.9.1 or svn 57262, Windows XP.

Matthew
import os, os.path, struct, time, datetime, gc, types


def timestamp_from_datetime(d):
    return time.mktime(d.timetuple()) + d.microsecond / 1000000.0

def datetime_from_timestamp(t):
    return datetime.datetime.fromtimestamp(t)

def _decode_entry(data):
    recvd, timestamp, content_len = Entry.struct.unpack_from(data)
    recvd = bool(recvd)
    when = datetime_from_timestamp(timestamp)
    return recvd, when, content_len

class Entry(object):
    
    struct = struct.Struct('!BdI')
    
    @classmethod
    def read_from(cls, file):
        header = file.read(cls.struct.size)
        recvd, timestamp, content_len = _decode_entry(header)
        content = file.read(content_len)
        if len(content) != content_len:
            raise ValueError('Only read %i of %i charcters'%(len(content), 
content_len))
        return cls(recvd, timestamp, content)
    
    def __init__(self, recvd=None, when=None, content=None, data = None):
        self.recvd = recvd
        self.when = when
        self.content = content
        if data:
            self.data = data
    
    def __repr__(self):
        return 'Entry(%r, %r, %r)'%(self.recvd, self.when, self.content)
    
    def __eq__(self, other):
        return self.recvd == other.recvd and self.when == other.when and 
self.content == other.content
    
    # Properties
    def get_data(self):
        return self.struct.pack(int(self.recvd), 
timestamp_from_datetime(self.when), len(self.content))+self.content
    
    def set_data(self, data):
        self.recvd, self.when, content_len = _decode_entry(data)
        self.content = data[self.struct.size : self.struct.size + content_len]
    data = property(get_data, set_data)
    
class Logger(object):
    
    def __init__(self, file):
        self.file = file
    
    def recv(self, data, when=None):
        entry = Entry(True, when or datetime.datetime.now(), data)
        print 'logging received entry', repr(entry)
        self.file.write(entry.data)
    
    def send(self, data, when=None):
        entry = Entry(False, when or datetime.datetime.now(), data)
        print 'logging sent entry', repr(entry)
        self.file.write(entry.data)

class LogReader(object):
    
    def __init__(self, file):
        self.file = file
    
    def __iter__(self):
        self.file.seek(0)
        while True:
            try:
                yield Entry.read_from(self.file)
            except ValueError:
                if file.read() != '':
                    raise


packet_types={}
def register_type(number, cls):
    packet_types[number]=cls

class PacketParsingError(ValueError):
    pass

class Packet(object):
    
    struct=struct.Struct('!BiI')
    
    def __init__(self, id_hash=None, content=None, data=None):
        self.id_hash = id_hash
        self.content = content
        if data:
            self.data = data
    
    def __repr__(self):
        return 'Packet(%r, %r)'%(self.id_hash, self.content)
    
    def __eq__(self, other):
        return self.id_hash == other.id_hash and self.content == other.content
    
    # Properties    
    def get_type(self):
        '''Shortcut for content.type'''
        return self.content.type
    type = property(get_type)
    
    def get_data(self):
        content_data=self.content.data
        data = self.struct.pack(self.type, self.id_hash, 
len(content_data))+content_data
        return data
    
    def set_data(self, data, strict=True):
        type, self.id_hash, content_len = self.struct.unpack_from(data)
        i=self.struct.size
        j=i+content_len
        self.content = packet_types[type](data=data[i:j])
    data=property(get_data, set_data)

class Message(object):
    
    type=0
    
    struct=struct.Struct('!I')    
    
    def __init__(self, message=None, data=None):
        self.message=message
        if data:
            self.data=data
         
    def __repr__(self):
        return 'Message(%r)'%(self.message)
    
    def __eq__(self, other):
        return self.message == other.message
    
    # Properties
    def get_data(self):
        data=self.struct.pack(len(self.message))+str(self.message)
        return data
        
    def set_data(self, data):
        message_len, = self.struct.unpack_from(data)  # Note tuple
        i = self.struct.size
        j = i+message_len
        self.message=data[i:j]
    data=property(get_data, set_data)
register_type(Message.type, Message)



packets = [Packet(727508507, Message('testing'))]

def format_referrers(referrers):
    message = []
    for referrer in referrers:
        if isinstance(referrer, types.FrameType):
            message.append('frame: %s:%i (%s)'%(referrer.f_code.co_filename, 
referrer.f_lineno, referrer.f_code.co_name))
        else:
            message.append(repr(referrer))
    return '\n'.join(message)
    
class TestLoggerAndReader:
    
    filename = 'TestLoggerAndReader.test'
    subjects = packets
    
    def test_sanity(self):
        try:
            assert not os.path.exists(self.filename)
            
            logger = Logger(open(self.filename, 'wb'))
            for packet in self.subjects:
                logger.recv(packet.data)
            logger.file.close()
            reader = LogReader(open(self.filename, 'rb'))
            for entry, packet in zip(reader, self.subjects):
                print 'reading received entry:', repr(entry)
                assert entry.recvd == True
                assert Packet(data=entry.content) == packet
            reader.file.close()
        finally:
            try:
                print 'logger.file referrers:'
                print format_referrers(gc.get_referrers(logger.file))
                del logger.file
                print 'reader.file referrers:'
                print format_referrers(gc.get_referrers(reader.file))
                del reader.file
            except UnboundLocalError:
                pass
            gc.collect()
            os.remove(self.filename)
_______________________________________________
py-dev mailing list
py-dev@codespeak.net
http://codespeak.net/mailman/listinfo/py-dev

Reply via email to