indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.
REVISION SUMMARY
The vendored cbor2 package doesn't have support for streaming /
indefinite length items when encoding. On the decoding side, it
supported indefinite types. However, it waits for all data to arrive
before emitting a result. This is kind of unfortunate because
indefinite length items facilitate streaming without buffering.
This commit implements support for encoding indefinite length
bytestrings, arrays, and maps. It implements support for decoding
indefinite length bytestrings.
I strived to use generators for moving data around as much as
possible because they are much efficient than read()/write()
because no extra memory copying, allocation, concatenations,
buffering, etc occur unless the producer/consumer needs it to.
This helps keep things fast.
REPOSITORY
rHG Mercurial
REVISION DETAIL
https://phab.mercurial-scm.org/D3303
AFFECTED FILES
contrib/import-checker.py
mercurial/utils/cborutil.py
tests/test-cbor.py
CHANGE DETAILS
diff --git a/tests/test-cbor.py b/tests/test-cbor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-cbor.py
@@ -0,0 +1,235 @@
+from __future__ import absolute_import
+
+import io
+import unittest
+
+from mercurial.thirdparty import (
+cbor,
+)
+from mercurial.utils import (
+cborutil,
+)
+
+class IndefiniteBytestringTests(unittest.TestCase):
+def testitertoiter(self):
+# This is the example from RFC 7049 Section 2.2.2.
+source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
+
+it = cborutil.itertoindefinitebytestring(source)
+
+self.assertEqual(next(it), b'\x5f')
+self.assertEqual(next(it), b'\x44')
+self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+self.assertEqual(next(it), b'\x43')
+self.assertEqual(next(it), b'\xee\xff\x99')
+self.assertEqual(next(it), b'\xff')
+
+with self.assertRaises(StopIteration):
+next(it)
+
+dest = b''.join(cborutil.itertoindefinitebytestring(source))
+
+self.assertEqual(cbor.loads(dest), b''.join(source))
+
+def testreadtoiter(self):
+source = io.BytesIO(b'\x5f\x44\xaa\xbb\xcc\xdd\x43\xee\xff\x99\xff')
+
+it = cborutil.readindefinitebytestringtoiter(source)
+self.assertEqual(next(it), b'\xaa\xbb\xcc\xdd')
+self.assertEqual(next(it), b'\xee\xff\x99')
+
+with self.assertRaises(StopIteration):
+next(it)
+
+def testtoiterlarge(self):
+source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
+
+dest = b''.join(cborutil.itertoindefinitebytestring(source))
+
+self.assertEqual(cbor.loads(dest), b''.join(source))
+
+def testbuffertoindefinite(self):
+source = b'\x00\x01\x02\x03' + b'\xff' * 16384
+
+it = cborutil.buffertoindefinitebytestring(source, chunksize=2)
+
+self.assertEqual(next(it), b'\x5f')
+self.assertEqual(next(it), b'\x42')
+self.assertEqual(next(it), b'\x00\x01')
+self.assertEqual(next(it), b'\x42')
+self.assertEqual(next(it), b'\x02\x03')
+self.assertEqual(next(it), b'\x42')
+self.assertEqual(next(it), b'\xff\xff')
+
+def testbuffertoindefiniteroundtrip(self):
+source = b'x' * 1048576
+
+chunks = list(cborutil.buffertoindefinitebytestring(source))
+self.assertEqual(len(chunks), 34)
+
+self.assertEqual(cbor.loads(b''.join(chunks)), source)
+
+class StreamArrayTests(unittest.TestCase):
+def testempty(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder):
+pass
+
+self.assertEqual(b.getvalue(), '\x9f\xff')
+self.assertEqual(cbor.loads(b.getvalue()), [])
+
+def testone(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder) as fn:
+fn(b'foo')
+
+self.assertEqual(cbor.loads(b.getvalue()), [b'foo'])
+
+def testmultiple(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder) as fn:
+fn(0)
+fn(True)
+fn(b'foo')
+fn(None)
+
+self.assertEqual(cbor.loads(b.getvalue()), [0, True, b'foo', None])
+
+def testnested(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder):
+with cborutil.streamarray(encoder) as fn:
+fn(b'foo')
+fn(b'bar')
+
+self.assertEqual(cbor.loads(b.getvalue()), [[b'foo', b'bar']])
+
+def testitemslist(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+orig = [b'foo', b'bar', None, True, 42]
+
+cborutil.streamarrayitems(encoder, orig)
+self.assertEqual(cbor.loads(b.getvalue()), orig)
+
+de