Author: Armin Rigo <ar...@tunes.org> Branch: Changeset: r46150:69fabe6fa63e Date: 2011-08-01 14:15 +0200 http://bitbucket.org/pypy/pypy/changeset/69fabe6fa63e/
Log: A quick writing and quick testing of MultibyteStream{Reader,Writer}. diff --git a/pypy/module/_multibytecodec/app_multibytecodec.py b/pypy/module/_multibytecodec/app_multibytecodec.py --- a/pypy/module/_multibytecodec/app_multibytecodec.py +++ b/pypy/module/_multibytecodec/app_multibytecodec.py @@ -1,34 +1,48 @@ # NOT_RPYTHON # -# These classes are not supported so far. -# -# My theory is that they are not widely used on CPython either, because -# I found two bugs just by looking at their .c source: they always call -# encreset() after a piece of data, even though I think it's wrong --- -# it should be called only once at the end; and mbiencoder_reset() calls -# decreset() instead of encreset(). -# +# The interface here may be a little bit on the lightweight side. -class MultibyteIncrementalEncoder(object): - def __init__(self, *args, **kwds): - raise LookupError( - "MultibyteIncrementalEncoder not implemented; " - "see pypy/module/_multibytecodec/app_multibytecodec.py") +from _multibytecodec import MultibyteIncrementalDecoder +from _multibytecodec import MultibyteIncrementalEncoder -class MultibyteIncrementalDecoder(object): - def __init__(self, *args, **kwds): - raise LookupError( - "MultibyteIncrementalDecoder not implemented; " - "see pypy/module/_multibytecodec/app_multibytecodec.py") -class MultibyteStreamReader(object): - def __init__(self, *args, **kwds): - raise LookupError( - "MultibyteStreamReader not implemented; " - "see pypy/module/_multibytecodec/app_multibytecodec.py") +class MultibyteStreamReader(MultibyteIncrementalDecoder): + def __new__(cls, stream, errors=None): + self = MultibyteIncrementalDecoder.__new__(cls, errors) + self.stream = stream + return self -class MultibyteStreamWriter(object): - def __init__(self, *args, **kwds): - raise LookupError( - "MultibyteStreamWriter not implemented; " - "see pypy/module/_multibytecodec/app_multibytecodec.py") + def __read(self, read, size): + while True: + if size is None: + data = read() + else: + data = read(size) + final = not data + output = self.decode(data, final) + if output or final: + return output + size = 1 # read 1 more byte and retry + + def read(self, size=None): + return self.__read(self.stream.read, size) + + def readline(self, size=None): + return self.__read(self.stream.readline, size) + + def readlines(self, sizehint=None): + return self.__read(self.stream.read, sizehint).splitlines(True) + + +class MultibyteStreamWriter(MultibyteIncrementalEncoder): + def __new__(cls, stream, errors=None): + self = MultibyteIncrementalEncoder.__new__(cls, errors) + self.stream = stream + return self + + def write(self, data): + self.stream.write(self.encode(data)) + + def writelines(self, lines): + for data in lines: + self.write(data) diff --git a/pypy/module/_multibytecodec/test/test_app_stream.py b/pypy/module/_multibytecodec/test/test_app_stream.py new file mode 100644 --- /dev/null +++ b/pypy/module/_multibytecodec/test/test_app_stream.py @@ -0,0 +1,54 @@ +from pypy.conftest import gettestobjspace + + +class AppTestStreams: + def setup_class(cls): + cls.space = gettestobjspace(usemodules=['_multibytecodec']) + cls.w_HzStreamReader = cls.space.appexec([], """(): + import _codecs_cn + from _multibytecodec import MultibyteStreamReader + + class HzStreamReader(MultibyteStreamReader): + codec = _codecs_cn.getcodec('hz') + + return HzStreamReader + """) + cls.w_HzStreamWriter = cls.space.appexec([], """(): + import _codecs_cn + from _multibytecodec import MultibyteStreamWriter + + class HzStreamWriter(MultibyteStreamWriter): + codec = _codecs_cn.getcodec('hz') + + return HzStreamWriter + """) + + def test_reader(self): + class FakeFile: + def __init__(self, data): + self.data = data + self.pos = 0 + def read(self, size): + res = self.data[self.pos : self.pos + size] + self.pos += size + return res + # + r = self.HzStreamReader(FakeFile("!~{abcd~}xyz~{efgh")) + for expected in u'!\u5f95\u6c85xyz\u5f50\u73b7': + c = r.read(1) + assert c == expected + c = r.read(1) + assert c == '' + + def test_writer(self): + class FakeFile: + def __init__(self): + self.output = [] + def write(self, data): + self.output.append(data) + # + w = self.HzStreamWriter(FakeFile()) + for input in u'!\u5f95\u6c85xyz\u5f50\u73b7': + w.write(input) + assert w.stream.output == ['!', '~{ab~}', '~{cd~}', 'x', 'y', 'z', + '~{ef~}', '~{gh~}'] _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit