indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.
REVISION SUMMARY
The vendored cbor2 package doesn't have support for streaming /
indefinite length items when encoding. This is kind of unfortunate.
While it might be worth our while to upstream this feature to the
package, for now it is more expedient to implement it ourselves.
This commit implements support for encoding indefinite length
arrays and maps. We use a context manager that receives a CBOREncoder
and hands the caller a function that can be used to write individual
items. When the context manager exits, the "break" byte is sent.
As a refresher of RFC 7042, tThe initial byte of a CBOR item contains
3 bits for the major type and 5 bits for additional information.
Information value 31 is used to denote indefinite-length items.
Indefinite length arrays and maps simply emit their items inline. After
the final item, a "break" byte (major type 7, additional information 31
- value 0xff) is written. There are no nesting restrictions.
REPOSITORY
rHG Mercurial
REVISION DETAIL
https://phab.mercurial-scm.org/D3172
AFFECTED FILES
mercurial/utils/cborutil.py
tests/test-cbor.py
CHANGE DETAILS
diff --git a/tests/test-cbor.py b/tests/test-cbor.py
new file mode 100644
--- /dev/null
+++ b/tests/test-cbor.py
@@ -0,0 +1,176 @@
+from __future__ import absolute_import
+
+import io
+import unittest
+
+from mercurial.thirdparty import (
+cbor,
+)
+from mercurial.utils import (
+cborutil,
+)
+
+class StreamArrayTests(unittest.TestCase):
+def testempty(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder):
+pass
+
+self.assertEqual(b.getvalue(), '\x9f\xff')
+self.assertEqual(cbor.loads(b.getvalue()), [])
+
+def testone(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder) as fn:
+fn(b'foo')
+
+self.assertEqual(cbor.loads(b.getvalue()), [b'foo'])
+
+def testmultiple(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder) as fn:
+fn(0)
+fn(True)
+fn(b'foo')
+fn(None)
+
+self.assertEqual(cbor.loads(b.getvalue()), [0, True, b'foo', None])
+
+def testnested(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streamarray(encoder):
+with cborutil.streamarray(encoder) as fn:
+fn(b'foo')
+fn(b'bar')
+
+self.assertEqual(cbor.loads(b.getvalue()), [[b'foo', b'bar']])
+
+def testitemslist(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+orig = [b'foo', b'bar', None, True, 42]
+
+cborutil.streamarrayitems(encoder, orig)
+self.assertEqual(cbor.loads(b.getvalue()), orig)
+
+def testitemsgen(self):
+def makeitems():
+yield b'foo'
+yield b'bar'
+yield None
+yield 42
+
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+cborutil.streamarrayitems(encoder, makeitems())
+self.assertEqual(cbor.loads(b.getvalue()), [b'foo', b'bar', None, 42])
+
+class StreamMapTests(unittest.TestCase):
+def testempty(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streammap(encoder):
+pass
+
+self.assertEqual(b.getvalue(), '\xbf\xff')
+self.assertEqual(cbor.loads(b.getvalue()), {})
+
+def testone(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streammap(encoder) as fn:
+fn(b'key1', b'value1')
+
+self.assertEqual(cbor.loads(b.getvalue()), {b'key1': b'value1'})
+
+def testmultiple(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streammap(encoder) as fn:
+fn(0, 1)
+fn(b'key1', b'value1')
+fn(True, None)
+
+self.assertEqual(cbor.loads(b.getvalue()), {
+0: 1,
+b'key1': b'value1',
+True: None,
+})
+
+def testcomplex(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streammap(encoder) as fn:
+fn(b'key1', b'value1')
+fn(b'map', {b'inner1key': b'inner1value'})
+fn(b'array', [0, 1, 2])
+
+self.assertEqual(cbor.loads(b.getvalue()), {
+b'key1': b'value1',
+b'map': {b'inner1key': b'inner1value'},
+b'array': [0, 1, 2],
+})
+
+def testnested(self):
+b = io.BytesIO()
+encoder = cbor.CBOREncoder(b)
+
+with cborutil.streammap(encoder):
+encoder.encode(b'streamkey')
+with cborutil.strea